- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个使用 EventStream 的 ReactJS 客户端和一个实现 SSE 的 golang 后端。
当我将浏览器连接到在本地主机上运行的后端时,以及当我的后端在具有端口转发功能的 k8s 上运行时,一切似乎都能正常工作。
一旦我创建了一个带有主机名的入口(这样我就不必一直进行端口转发)SSE 就停止工作了。我仍然看到客户端发送请求并且该请求被后端接收并注册。但是,当发送事件时,它永远不会到达我的 ReactJS 应用程序。
我附上了我的后端 SSE 实现的代码:
package sse
import (
"encoding/json"
"fmt"
"net/http"
"time"
"go.uber.org/zap"
"github.com/talon-one/towers/controller/api/log"
)
// the amount of time to wait when pushing a message to
// a slow client or a client that closed after `range clients` started.
const patience time.Duration = time.Second * 2
type customerStateUpdate struct {
sseEvent
CustomerName string `json:"customer_name"`
CustomerState string `json:"customer_state"`
}
type contentUpdate struct {
sseEvent
}
type sseEvent struct {
EventType string `json:"event_type"`
}
type Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier chan []byte
// New client connections
newClients chan chan []byte
// Closed client connections
closingClients chan chan []byte
// Client connections registry
clients map[chan []byte]bool
log *log.Logger
}
func NewBroker(log *log.Logger) (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan []byte, 1),
newClients: make(chan chan []byte),
closingClients: make(chan chan []byte),
clients: make(map[chan []byte]bool),
log: log.With(zap.String("component", "SSE")),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}
func (broker *Broker) HandleContentChange() error {
event := contentUpdate{
sseEvent: sseEvent{EventType: "contentUpdate"},
}
payload, err := json.Marshal(&event)
if err != nil {
return err
}
broker.Notifier <- payload
return nil
}
func (broker *Broker) HandleCustomerStateChange(name, state string) error {
event := customerStateUpdate{
sseEvent: sseEvent{EventType: "customerStateUpdate"},
CustomerName: name,
CustomerState: state,
}
broker.log.Info("Sending SSE to registered clients", zap.String("name", name), zap.String("state", state))
payload, err := json.Marshal(&event)
if err != nil {
return err
}
broker.Notifier <- payload
return nil
}
func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// Make sure that the writer supports flushing.
//
flusher, ok := rw.(http.Flusher)
if !ok {
http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
return
}
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- messageChan
}()
notify := rw.(http.CloseNotifier).CloseNotify()
for {
select {
case <-notify:
return
case msg := <-messageChan:
// Write to the ResponseWriter
// Server Sent Events compatible
fmt.Fprintf(rw, "data: %s\n\n", msg)
// Flush the data immediately instead of buffering it for later.
flusher.Flush()
}
}
}
func (broker *Broker) listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
broker.log.Info("Client added", zap.Int("current_count", len(broker.clients)))
case s := <-broker.closingClients:
// A client has detached and we want to
// stop sending them messages.
delete(broker.clients, s)
broker.log.Info("Client removed", zap.Int("current_count", len(broker.clients)))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan := range broker.clients {
select {
case clientMessageChan <- event:
case <-time.After(patience):
broker.log.Info("Skipping client")
}
}
}
}
}
在我的 ReactJS 应用中:
export default class CustomersTable extends Component {
constructor(props) {
super(props)
this.eventSource = new EventSource('/v1/events')
}
updateCustomerState(e) {
let event = JSON.parse(e.data)
switch (event.event_type) {
case 'customerStateUpdate':
let newData = this.state.customers.map(item => {
if (item.name === event.customer_name) {
item.k8sState = event.customer_state
}
return item
})
this.setState(Object.assign({}, { customers: newData }))
break
case 'contentUpdate':
this.reload()
break
default:
break
}
}
componentDidMount() {
this.setState({ isLoading: true })
ReactModal.setAppElement('body')
this.reload()
this.eventSource.onmessage = e => this.updateCustomerState(e)
}
componentWillUnmount() {
this.eventSource.close()
}
...
最佳答案
我的 SSE 应用在 Nginx Ingress 上使用:
annotations:
nginx.ingress.kubernetes.io/proxy-read-timeout: "21600"
nginx.ingress.kubernetes.io/eventsource: "true"
关于reactjs - 使用 Kubernetes 部署并通过 Ingress 连接后 SSE 中断,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58560048/
我在 GCP 上使用 k8s。需要设置入口来设置 TLS 连接,所以我为我的应用程序设置了入口,它有效!!! 顺便说一句,什么是入口 Controller ,如 Nginx Ingress Contr
我有一个响应 / 的后端服务,但我希望它在入口路由 myhost.com/overview 上运行。无论我尝试哪种配置,traefik 都不会删除路径 /overview - 我可以看到后端获取 /o
我正在尝试添加多个应该共享同一主机的 Ingress。 一个 Ingress 应该处理对 www.example.de/some 的请求,一个 Ingress 应该处理所有其他请求。 这是 Ingre
我正在尝试设置入口负载平衡器。 基本上,我有一个具有多个路径的后端服务。 假设我的后端 NodePort 服务名称是 hello-app。与此服务关联的 pod 公开多个路径,如/foo 和/bar。
我已经按照文档中的说明设置了 Nginx Controller https://docs.nginx.com/nginx-ingress-controller/installation/install
在 kubernetes 中,如果我在下面有一个入口资源,它如何知道要使用哪种类型的入口 Controller 或哪个入口 Controller (如果我有多个)? ” apiVersion: ext
如何为 ingress-nginx 中的所有主机使用自定义通配符 TLS 证书? 我使用 ingress-nginx 作为入口 Controller 。它是使用 Helm 图表安装的: helm re
有没有办法在 Kong-Ingress-Controller 中设置通配符证书以在每个 Ingress 中使用? 我从图表安装了Kong: $ helm repo add kong https://c
GKE 入口:https://cloud.google.com/kubernetes-engine/docs/concepts/ingress Nginx 入口:https://kubernetes.
此配置适用于其他集群,但不适用于我部署的最后一个集群。 我的 RBAC 配置存在某种问题。 kubectl get pods -n ingress-controller NAME
我有一个服务在 NodePort 服务上运行。我如何使用入口访问多个服务。 部署.yml apiVersion: apps/v1 kind: Deployment metadata: name:
我正在尝试在 GKE 中创建静态内部入口。似乎我们没有直接的方法。我遵循了 How to set static internal IP to the GKE internal Ingress 之后的解
我正在尝试让 GKE 入口要求像这样的基本身份验证 example from github. 入口工作正常。它路由到服务。但是身份验证不起作用。允许所有流量直接通过。 GKE 还没有推出这个功能吗?我
我目前正在将 IT 环境从 Nginx Ingress Gateway 迁移到 Kubernetes 上的 IstIO Ingress Gateway。 我需要迁移以下 Nginx 注释: nginx
我想在我的自托管系统上构建一个 gitlab + kubernetes 小 gitops。但是当我尝试从 gitlab kubernetes 部分安装 nginx ingress 时,出现此错误:Se
I'm struggling to set a global (on ingress controller scope) SSL/HTTPS redirection. It works fine
I'm struggling to set a global (on ingress controller scope) SSL/HTTPS redirection. It works fine
我正在尝试使用 Kong 插件进行 k8s 入口自定义。具体来说,我正在使用 Kong 入口 Controller 和“request-transformer-advanced”插件(引用: http
我使用 Kubespray 部署了一个裸机集群,并启用了 kubernetes 1.22.2、MetalLB 和 ingress-nginx。在设置 ingressClassName: nginx 时
我正在使用 NextJS,我需要它知道它何时在服务器或浏览器上发出请求。要在服务器端做,因为我是在微服务架构中构建的,我需要获取服务的服务名称和命名空间来完成这样的 url http://SERVIC
我是一名优秀的程序员,十分优秀!