- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
上一篇我们分析了argo-workflow 中的 archive,包括 流水线GC、流水线归档、日志归档等功能。本篇主要分析 Workflow 中的几种触发方式,包括手动触发、定时触发、Event 事件触发等.
Argo Workflows 的流水线有多种触发方式:
CronWorkflow 本质上就是一个 Workflow + Cron Spec.
设计上参考了 k8s 中的 CronJob 。
一个简单的 CronWorkflow 如下:
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: test-cron-wf
spec:
schedule: "* * * * *"
concurrencyPolicy: "Replace"
startingDeadlineSeconds: 0
workflowSpec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: alpine:3.6
command: [sh, -c]
args: ["date; sleep 90"]
apply 一下,可以看到创建出来的 Workflow 命名为 $cronWorkflowName-xxx 。
[root@lixd-argo workdir]# k get cwf
NAME AGE
test-cron-wf 116s
[root@lixd-argo workdir]# k get wf
NAME STATUS AGE MESSAGE
test-cron-wf-1711852560 Running 47s
由于 template 中运行任务是 sleep 90s 因此,整个任务耗时肯定是超过 60s 的,根据设置的 concurrencyPolicy 为 Replace ,因此 60s 后,第二个 Workflow 被创建出来,第一个就会被停止掉.
[root@lixd-argo workdir]# k get wf
NAME STATUS AGE MESSAGE
test-cron-wf-1711852560 Failed 103s Stopped with strategy 'Terminate'
test-cron-wf-1711852620 Running 43s
支持的具体参数如下:
type CronWorkflowSpec struct {
// WorkflowSpec is the spec of the workflow to be run
WorkflowSpec WorkflowSpec `json:"workflowSpec" protobuf:"bytes,1,opt,name=workflowSpec,casttype=WorkflowSpec"`
// Schedule is a schedule to run the Workflow in Cron format
Schedule string `json:"schedule" protobuf:"bytes,2,opt,name=schedule"`
// ConcurrencyPolicy is the K8s-style concurrency policy that will be used
ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty" protobuf:"bytes,3,opt,name=concurrencyPolicy,casttype=ConcurrencyPolicy"`
// Suspend is a flag that will stop new CronWorkflows from running if set to true
Suspend bool `json:"suspend,omitempty" protobuf:"varint,4,opt,name=suspend"`
// StartingDeadlineSeconds is the K8s-style deadline that will limit the time a CronWorkflow will be run after its
// original scheduled time if it is missed.
StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty" protobuf:"varint,5,opt,name=startingDeadlineSeconds"`
// SuccessfulJobsHistoryLimit is the number of successful jobs to be kept at a time
SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty" protobuf:"varint,6,opt,name=successfulJobsHistoryLimit"`
// FailedJobsHistoryLimit is the number of failed jobs to be kept at a time
FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty" protobuf:"varint,7,opt,name=failedJobsHistoryLimit"`
// Timezone is the timezone against which the cron schedule will be calculated, e.g. "Asia/Tokyo". Default is machine's local time.
Timezone string `json:"timezone,omitempty" protobuf:"bytes,8,opt,name=timezone"`
// WorkflowMetadata contains some metadata of the workflow to be run
WorkflowMetadata *metav1.ObjectMeta `json:"workflowMetadata,omitempty" protobuf:"bytes,9,opt,name=workflowMeta"`
}
内容可以分为 3 部分:
WorkflowSpec 和 WorkflowMetadata 没太大区别,就不赘述了,分析一下 Cron Spec 相关的几个字段:
* * * * *
每分钟创建一次大部分字段和 K8s CronJob 一致 。
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: my-cron
spec:
schedule: "* * * * *"
concurrencyPolicy: "Replace"
startingDeadlineSeconds: 0
workflowSpec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: alpine:3.6
command: [sh, -c]
args: ["date; sleep 10"]
workflowMetadata:
labels:
from: cron
增加了 metadata,测试一下 。
[root@lixd-argo workdir]# k get wf my-cron-1711853400 -oyaml|grep labels -A 1
labels:
from: cron
可以看到,创建出来的 Workflow 确实携带上了,在 CronWorkflow 中指定的 label.
argo 提供了一个 Event API:/api/v1/events/{namespace}/{discriminator},该 API 可以接受任意 json 数据.
通过 event API 可以创建 Workflow ,类似于 Webhook.
具体请求长这样:
curl https://localhost:2746/api/v1/events/argo/ \
-H "Authorization: $ARGO_TOKEN" \
-d '{"message": "hello"}'
或者这样:
curl https://localhost:2746/api/v1/events/argo/my-discriminator \
-H "Authorization: $ARGO_TOKEN" \
-d '{"message": "hello"}'
创建 RBAC 相关对象,role、rolebinding、sa,其中 role 只需要提供最小权限即可.
直接创建在 default 命名空间 。
kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: test
rules:
- apiGroups:
- argoproj.io
resources:
- workfloweventbindings
verbs:
- list
- apiGroups:
- argoproj.io
resources:
- workflowtemplates
verbs:
- get
- apiGroups:
- argoproj.io
resources:
- workflows
verbs:
- create
EOF
serviceaccount 和 rolebinding 。
kubectl create sa test
kubectl create rolebinding test --role=test --serviceaccount=default:test
然后创建一个 Secret 。
kubectl apply -f - <<EOF
apiVersion: v1
kind: Secret
metadata:
name: test.service-account-token
annotations:
kubernetes.io/service-account.name: test
type: kubernetes.io/service-account-token
EOF
最后就可以查询 Secret 解析 Token 了 。
ARGO_TOKEN="Bearer $(kubectl get secret test.service-account-token -o=jsonpath='{.data.token}' | base64 --decode)"
echo $ARGO_TOKEN
Bearer ZXlKaGJHY2lPaUpTVXpJMU5pSXNJbXRwWkNJNkltS...
测试,能否正常使用 。
ARGO_SERVER=$(kubectl get svc argo-workflows-server -n argo -o=jsonpath='{.spec.clusterIP}')
curl http://$ARGO_SERVER:2746/api/v1/workflow-event-bindings/default -H "Authorization: $ARGO_TOKEN"
为了接收 Event,可以创建 WorkflowEventBinding 对象,具体如下:
apiVersion: argoproj.io/v1alpha1
kind: WorkflowEventBinding
metadata:
name: event-consumer
spec:
event:
# metadata header name must be lowercase to match in selector
selector: payload.message != "" && metadata["x-argo-e2e"] == ["true"] && discriminator == "my-discriminator"
submit:
workflowTemplateRef:
name: my-wf-tmple
arguments:
parameters:
- name: message
valueFrom:
event: payload.message
spec.event 指定了该 Binding 该如何匹配收到的 Event,比如这里的条件就是:
如果匹配则会使用 submit 下面指定的内容创建 Workflow:
至于创建出的 Workflow 则是由 my-wf-tmple 定义了,先创建这个 Template 。
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: my-wf-tmple
spec:
templates:
- name: main
inputs:
parameters:
- name: message
value: "{{workflow.parameters.message}}"
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
entrypoint: main
最后我们就可以发送 API 来触发 event 实现 Workflow 的创建 。
curl $ARGO_SERVER:2746/api/v1/events/default/my-discriminator \
-H "Authorization: $ARGO_TOKEN" \
-H "X-Argo-E2E: true" \
-d '{"message": "hello events"}'
测试一下:
{}[root@lixd-argo workdir]# curl $ARGO_SERVER:2746/api/v1/events/default/my-discriminator \
> -H "Authorization: $ARGO_TOKEN" \
> -H "X-Argo-E2E: true" \
> -d '{"message": "hello events"}'
{}[root@lixd-argo workdir]# k get wf
NAME STATUS AGE MESSAGE
my-wf-tmple-ea81n Running 5s
[root@lixd-argo workdir]# k get wf my-wf-tmple-ea81n -oyaml|grep parameters -A 5
parameters:
- name: message
value: hello events
可以看到,Workflow 已经创建出来了,而且参数也是我们发请求时给的 hello events.
默认情况下 argo-server 可以同时处理 64 个事件,再多就会直接返回 503 了,可以通过以下参数进行调整:
前面 Event 章节提到了可以通过发送 HTTP 请求的方式来创建触发 event 以 Workflow,但是需要客户端提供 AuthToken.
问题来了,对于一些不能指定 Token 的客户端来说就比较麻烦了,比如 Github、Gitlab 等 Git 仓库,都可以配置 Webhook,在收到 commit 的时候调用 Webhook 来触发流水线.
此时,这些发送过来的请求肯定是没有带 Token 的,因此需要额外配置来进行验证,保证 argo 只处理来自 Github、Gitlab 等等平台的 Webhook 请求.
第一步 Token 和 Event 章节一致,就不在赘述了,主要是第二步.
上一步,创建 RBAC 对象,准备好 Secret 之后,一般客户端都是解析 Secret 中的 Token,然后带上该 Token 发送请求,就像这样:
ARGO_SERVER=$(kubectl get svc argo-workflows-server -n argo -o=jsonpath='{.spec.clusterIP}')
ARGO_TOKEN="Bearer $(kubectl get secret jenkins.service-account-token -o=jsonpath='{.data.token}' | base64 --decode)"
curl https://$ARGO_SERVER:2746/api/v1/events/default/ \
-H "Authorization: $ARGO_TOKEN" \
-d '{"message": "hello"}'
但是,对于 Webhook 客户端来说,是没办法这样指定 token 的,因此需要通过argo-workflows-webhook-clients 配置来告诉 argo,哪个 Webhook 使用哪个 Secret 中的 token.
创建一个名为argo-workflows-webhook-clients 的 Secret,内容大致是这样的:
kind: Secret
apiVersion: v1
metadata:
name: argo-workflows-webhook-clients
# The data keys must be the name of a service account.
stringData:
# https://support.atlassian.com/bitbucket-cloud/docs/manage-webhooks/
bitbucket.org: |
type: bitbucket
secret: "my-uuid"
# https://confluence.atlassian.com/bitbucketserver/managing-webhooks-in-bitbucket-server-938025878.html
bitbucketserver: |
type: bitbucketserver
secret: "shh!"
# https://developer.github.com/webhooks/securing/
github.com: |
type: github
secret: "shh!"
# https://docs.gitlab.com/ee/user/project/integrations/webhooks.html
gitlab.com: |
type: gitlab
secret: "shh!"
以 Github 具体,secret 配置如下:
在添加 Webhook 时可以填一个 Secret 配置,实际就是一串加密字符,随便填什么都可以.
这样 Github 发送 Webhook 请求时就会携带上这个 Secret 信息,Argo 收到后就根据argo-workflows-webhook-clients 的 Secret 里配置的 type=github 的 secret 字段进行对比,如果匹配上就处理,否则就忽略该请求.
如果能匹配上就从对应的 Serviceaccount 中解析 Token 作为 Authorization 信息.
Webhook 这一块,官方文档不是很详细,一笔带过了,因此翻了下源码.
这块逻辑以一个 Interceptor 的形式出现,对于所有 Event API 都会经过该逻辑,用于为没有携带 Authorization 的请求添加 Authorization 信息.
// Interceptor creates an annotator that verifies webhook signatures and adds the appropriate access token to the request.
func Interceptor(client kubernetes.Interface) func(w http.ResponseWriter, r *http.Request, next http.Handler) {
return func(w http.ResponseWriter, r *http.Request, next http.Handler) {
err := addWebhookAuthorization(r, client)
if err != nil {
log.WithError(err).Error("Failed to process webhook request")
w.WriteHeader(403)
// hide the message from the user, because it could help them attack us
_, _ = w.Write([]byte(`{"message": "failed to process webhook request"}`))
} else {
next.ServeHTTP(w, r)
}
}
}
调用 addWebhookAuthorization 尝试添加认证信息.
func addWebhookAuthorization(r *http.Request, kube kubernetes.Interface) error {
// try and exit quickly before we do anything API calls
if r.Method != "POST" || len(r.Header["Authorization"]) > 0 || !strings.HasPrefix(r.URL.Path, pathPrefix) {
return nil
}
parts := strings.SplitN(strings.TrimPrefix(r.URL.Path, pathPrefix), "/", 2)
if len(parts) != 2 {
return nil
}
namespace := parts[0]
secretsInterface := kube.CoreV1().Secrets(namespace)
ctx := r.Context()
webhookClients, err := secretsInterface.Get(ctx, "argo-workflows-webhook-clients", metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get webhook clients: %w", err)
}
// we need to read the request body to check the signature, but we still need it for the GRPC request,
// so read it all now, and then reinstate when we are done
buf, _ := io.ReadAll(r.Body)
defer func() { r.Body = io.NopCloser(bytes.NewBuffer(buf)) }()
serviceAccountInterface := kube.CoreV1().ServiceAccounts(namespace)
for serviceAccountName, data := range webhookClients.Data {
r.Body = io.NopCloser(bytes.NewBuffer(buf))
client := &webhookClient{}
err := yaml.Unmarshal(data, client)
if err != nil {
return fmt.Errorf("failed to unmarshal webhook client \"%s\": %w", serviceAccountName, err)
}
log.WithFields(log.Fields{"serviceAccountName": serviceAccountName, "webhookType": client.Type}).Debug("Attempting to match webhook request")
ok := webhookParsers[client.Type](client.Secret, r)
if ok {
log.WithField("serviceAccountName", serviceAccountName).Debug("Matched webhook request")
serviceAccount, err := serviceAccountInterface.Get(ctx, serviceAccountName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get service account \"%s\": %w", serviceAccountName, err)
}
tokenSecret, err := secretsInterface.Get(ctx, secrets.TokenNameForServiceAccount(serviceAccount), metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get token secret \"%s\": %w", tokenSecret, err)
}
r.Header["Authorization"] = []string{"Bearer " + string(tokenSecret.Data["token"])}
return nil
}
}
return nil
}
具体流程如下:
第三步会直接使用 key 作为 serviceaccount,这也就是为什么配置argo-workflows-webhook-clients时需要把 serviceaccount 名称做为 key.
【ArgoWorkflow 系列】持续更新中,搜索公众号【探索云原生】订阅,阅读更多文章.
本文主要分析了 Argo 中的 Workflow 的几种触发方式.
argo-workflows-webhook-clients
配置好不同来源的 Webhook 使用的 Secret 以实现认证,这样就可以把 Event API 用作 Webhook 端点 配置到 Github、Gitlab 等环境了。最后此篇关于ArgoWorkflow教程(五)---Workflow的多种触发模式:手动、定时任务与事件触发的文章就讲到这里了,如果你想了解更多关于ArgoWorkflow教程(五)---Workflow的多种触发模式:手动、定时任务与事件触发的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在做一个关于代码学院的教程,我在这里收到一个错误,说“看起来你的函数没有返回‘唉,你没有资格获得信用卡。资本主义就是这样残酷。’”当收入参数为 75 时。”但是该字符串在控制台中返回(由于某种原因
我正在阅读 Go 的官方教程,但很难理解 Channel 和 Buffered Channels 之间的区别。教程的链接是 https://tour.golang.org/concurrency/2和
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题? Update the question所以它是on-topic对于堆栈溢出。 9年前关闭。 Improve this que
已关闭。此问题不符合Stack Overflow guidelines 。目前不接受答案。 要求我们推荐或查找工具、库或最喜欢的场外资源的问题对于 Stack Overflow 来说是偏离主题的,因为
作为 iOS 新手,有大量书籍可以满足学习基础知识的需求。现在,我想转向一些高级阅读,例如 OAuth 和 SQLite 以及动态 API 派生的 TableView 等。您可以推荐任何资源吗? 最佳
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题? 更新问题,以便 editing this post 可以用事实和引用来回答它. 关闭 8 年前。 Improve
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 8 年前。
前言 很多同学都知道,我们常见的CTF赛事除了解题赛之外,还有一种赛制叫AWD赛制。在这种赛制下,我们战队会拿到一个或多个服务器。服务器的连接方式通常是SSH链接,并且可能一个战队可能会同时有
Memcached是一个自由开源的,高性能,分布式内存键值对缓存系统 Memcached 是一种基于内存的key-value存储,用来存储小块的任意数据(字符串、对象),这些数据可以是数据库调用、A
Perl 又名实用报表提取语言, 是 Practical Extraction and Report Language 的缩写 Perl 是由 拉里·沃尔(Larry Wall)于19
WSDL 是 Web Services Description Language 的缩写,翻译成中文就是网络服务描述语言 WSDL 是一门基于 XML 的语言,用于描述 Web Services 以
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 6年前关闭。 Improve thi
我正在寻找解释在 WPF 中创建自定义用户控件的教程。 我想要一个控件,它结合了一个文本 block 、一个文本框和一个启动通用文件打开对话框的按钮。我已经完成了布局,一切都连接好了。它有效,但它是三
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我接近 fourth page of the Django tutorial 的开始看着vote查看,最后是这样的: # Always return an HttpResponseRedirect a
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
是否有任何好的 Qt QSS 教程,或者在某个地方我可以看到样式小部件的示例?如果某处可用,我想要一些完整的引用。除了有关如何设置按钮或某些选项卡样式的小教程外,我找不到任何其他内容。 最佳答案 Qt
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引起辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the he
我是一名优秀的程序员,十分优秀!