gpt4 book ai didi

python - 为什么python客户端收不到SSE事件?

转载 作者:行者123 更新时间:2023-12-02 06:50:59 25 4
gpt4 key购买 nike

我有一个 python 客户端从带有 node.js API 的服务器监听 SSE 事件

流程是我通过 call_notification.py 向 node.js API 发送了一个事件并运行 seevents.py在循环中使用 run.sh (见下文)

但是我没有看到 python 客户端正在接收这个 SSE 事件?关于为什么会这样的任何指导?

call_notification.py

import requests
input_json = {'BATS':'678910','root_version':'12A12'}
url = 'http://company.com/api/root_event_notification?params=%s'%input_json
response = requests.get(url)
print response.text

Node .js API
app.get("/api/root_event_notification", (req, res, next) => {
console.log(req.query.params)
var events = require('events');
var eventEmitter = new events.EventEmitter();

//Create an event handler:
var myEventHandler = function () {
console.log('new_root_announced!');
res.status(200).json({
message: "New root build released!",
posts: req.query.params
});
}

seeevents.py(python 客户端监听 SSE 事件)
import json
import pprint
import sseclient

def with_urllib3(url):
"""Get a streaming response for the given event feed using urllib3."""
import urllib3
http = urllib3.PoolManager()
return http.request('GET', url, preload_content=False)

def with_requests(url):
"""Get a streaming response for the given event feed using requests."""
import requests
return requests.get(url, stream=True)

url = 'http://company.com/api/root_event_notification'
response = with_urllib3(url) # or with_requests(url)
client = sseclient.SSEClient(response)
#print client.events()
for event in client.events():
print "inside"
pprint.pprint(json.loads(event.data))

运行文件
#!/bin/sh

while [ /usr/bin/true ]
do
echo "Running sseevents.py"
python sseevents.py 2>&1 | tee -a sseevents.log.txt

echo "sleeping for 30 sec"
sleep 30
done

输出:-
Run call_notification.py on Terminal

node.js API OUTPUT

new_root_announced!
{'root_version': 'ABCD', 'BATS': '143'}

./run.sh --> DON'T SEE ABOVE EVENT below
Running sseevents.py
sleeping for 30 sec
Running sseevents.py
sleeping for 30 sec
Running sseevents.py
sleeping for 30 sec

最佳答案

非常简短的回答你的问题:

服务器代码没有将 SSE 消息发送回客户端。

为什么?因为你需要遵循SSE格式。

根据 JASON BUTZ in Server-Sent Events With Node

You should send a Connection: keep-alive header to ensure the client keeps the connection open as well. A Cache-Control header should be sent with the value no-cache to discourage the data being cached. Finally, the Content-Type needs to be set to text/event-stream.

With all of that done a newline (\n) should be sent to the client and then the events can be sent. Events must be sent as strings, but what is in that string doesn’t matter. JSON strings are perfectly fine.

Event data must be sent in the format "data: <DATA TO SEND HERE>\n".

It’s important to note that at the end of each line should be a newline character. To signify the end of an event an extra newline character needs to be added as well.

Multiple data lines are perfectly fine.



对您的问题的详细回答:

根据 Eric Bidelman in html5rocks.com :

When communicating using SSEs, a server can push data to your app whenever it wants, without the need to make an initial request. In other words, updates can be streamed from server to client as they happen.



但是,为了实现这一点,客户端必须“开始”请求它并准备接收 消息(当它们发生时)。
  • “开始”是通过调用 SSE API 端点(在您的情况下,调用 Node.js API 代码)来完成的。
  • 准备工作是通过准备处理异步消息流来完成的。

  • SSEs open a single unidirectional channel between server and client.*



    * 重点是我的

    这意味着服务器具有到客户端的“直接” channel 。它不打算由不是“客户端”代码的其他一些进程/代码“启动”(打开)。

    假设来自 OP 评论...

    预期行为(详细)
  • 客户端 Alice 使用参数 {name: "Alice"} 调用 API 端点,没有(可见)发生。
  • ...然后客户端 Bob 使用参数 {name: "Bob"} 调用 API 端点, 客户端 Alice 收到一个 SSE,负载为 {name: "Bob", says: "Hi"} .
  • ...然后客户端 Carol 使用参数 {name: "Carol"} 调用 API 端点,客户端 Alice 和 Bob 各自收到一个 SSE,其有效载荷为 {name: "Carol", says: "Hi"} .
  • ...等等。每次新客户端使用参数调用 API 端点时,所有其他拥有“开放” channel 的客户端都会收到带有新“Hi”有效负载的 SSE。
  • ...然后客户端 Bob 与服务器“断开连接”,客户端 Alice、客户端 Carol 和所有具有“开放” channel 的客户端将收到一个带有有效负载 {name: "Bob", says: "Bye"} 的 SSE| .
  • ...等等。每次旧客户端与服务器“断开连接”时,所有其他拥有“打开” channel 的客户端都会收到带有新“再见”有效负载的 SSE。

  • 抽象行为
  • 每个要求“打开” channel 的新客户端发送一些参数或旧客户端与服务器“断开连接”,它们在服务器中引起和事件。
  • 每次在服务器中发生此类事件时,服务器都会向所有“开放” channel 发送带有参数的 SSE 消息和作为有效负载的消息。

  • 阻塞注意事项 每个拥有“开放” channel 的客户端都将“陷入”无限等待循环中等待事件发生。使用“线程”代码技术来避免阻塞是客户端设计的责任。

    代码

    您的 Python 客户端应该“要求”启动单个单向 channel 并继续等待,直到 channel 关闭 .不应以不同的 channel 结束并重新开始。它应该保持相同的 channel 打开。

    从网络的角度来看,它就像一个“长”响应,不会结束(直到 SSE 消息传递结束)。回应只是“不断来来去去”。

    您的 Python 客户端代码就是这样做的。我注意到它是从 sseclient-py library 使用的确切示例代码.

    Python 3.4 的客户端代码

    要包含要发送到服务器的参数,请使用 Requests 中的一些代码。图书馆 docs/#passing-parameters-in-urls .

    因此,混合这些样本,我们最终得到以下代码作为您的 Python 3.4 客户端:
    import json
    import pprint
    import requests
    import sseclient # sseclient-py

    # change the name for each client
    input_json = {'name':'Alice'}
    #input_json = {'name':'Bob'}
    #input_json = {'name':'Carol'}

    url = 'http://company.com/api/root_event_notification'
    stream_response = requests.get(url, params=input_json, stream=True)

    client = sseclient.SSEClient(stream_response)

    # Loop forever (while connection "open")
    for event in client.events():
    print ("got a new event from server")
    pprint.pprint(event.data)

    Python 2.7 的客户端代码

    要包含要发送到服务器的参数,请使用 urllib.urlencode() 在 URL 中将它们编码为查询参数。图书馆。

    使用 urllib3.PoolManager().request() 发出 http 请求所以你最终会得到一个流响应。

    请注意 sseclient库将事件数据作为 unicode 字符串返回。要将 JSON 对象转换回 python 对象(使用 python 字符串),请使用 byteify ,递归自定义函数(感谢 Mark Amery)。

    使用以下代码作为您的 Python 2.7 客户端:
    import json
    import pprint
    import urllib
    import urllib3
    import sseclient # sseclient-py

    # Function that returns byte strings instead of unicode strings
    # Thanks to:
    # [Mark Amery](https://stackoverflow.com/users/1709587/mark-amery)
    def byteify(input):
    if isinstance(input, dict):
    return {byteify(key): byteify(value)
    for key, value in input.iteritems()}
    elif isinstance(input, list):
    return [byteify(element) for element in input]
    elif isinstance(input, unicode):
    return input.encode('utf-8')
    else:
    return input

    # change the name for each client
    input_json = {'name':'Alice'}
    #input_json = {'name':'Bob'}
    #input_json = {'name':'Carol'}

    base_url = 'http://localhost:3000/api/root_event_notification'
    url = base_url + '?' + urllib.urlencode(input_json)

    http = urllib3.PoolManager()
    stream_response = http.request('GET', url, preload_content=False)

    client = sseclient.SSEClient(stream_response)

    # Loop forever (while connection "open")
    for event in client.events():
    print ("got a new event from server")
    pprint.pprint(byteify(json.loads(event.data)))

    现在,服务器代码应该:
  • 发出内部服务器 'hello' 事件 其他客户收听事件
  • “开通” channel
  • 注册以监听所有可能发生的服务器内部事件(这意味着保持 channel “打开”并且不在消息之间发送任何内容,只是保持 channel “打开”)。
  • 这包括发出内部服务器“再见”事件 其他客户当 channel 被客户端/网络关闭时监听事件(最后“结束”)。

  • 使用以下 Node.js API 代码:
    var EventEmitter = require('events').EventEmitter;
    var myEmitter = new EventEmitter;


    function registerEventHandlers(req, res) {
    // Save received parameters
    const myParams = req.query;

    // Define function that adds "Hi" and send a SSE formated message
    const sayHi = function(params) {
    params['says'] = "Hi";
    let payloadString = JSON.stringify(params);
    res.write(`data: ${payloadString}\n\n`);
    }

    // Define function that adds "Bye" and send a SSE formated message
    const sayBye = function(params) {
    params['says'] = "Bye";
    let payloadString = JSON.stringify(params);
    res.write(`data: ${payloadString}\n\n`);
    }

    // Register what to do when inside-server 'hello' event happens
    myEmitter.on('hello', sayHi);

    // Register what to do when inside-server 'goodbye' event happens
    myEmitter.on('goodbye', sayBye);

    // Register what to do when this channel closes
    req.on('close', () => {
    // Emit a server 'goodbye' event with "saved" params
    myEmitter.emit('goodbye', myParams);

    // Unregister this particular client listener functions
    myEmitter.off('hello', sayHi);
    myEmitter.off('goodbye', sayBye);
    console.log("<- close ", req.query);
    });
    }


    app.get("/api/root_event_notification", (req, res, next) => {
    console.log("open -> ", req.query);

    // Emit a inside-server 'hello' event with the received params
    myEmitter.emit('hello', req.query);

    // SSE Setup
    res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    });
    res.write('\n');

    // Register what to do when possible inside-server events happen
    registerEventHandlers(req, res);

    // Code execution ends here but channel stays open
    // Event handlers will use the open channel when inside-server events happen
    })

    ...继续引用 Eric Bidelman in html5rocks.com :

    Sending an event stream from the source is a matter of constructing a plaintext response, served with a text/event-stream Content-Type, that follows the SSE format. In its basic form, the response should contain a "data:" line, followed by your message, followed by two "\n" characters to end the stream



    在客户端代码中, sseclient-py library负责解释 SSE 格式,因此每次 两个 “\n”字符到达,库“迭代”一个新的“可迭代”对象(一个新事件),它具有 data属性与从服务器发送的消息。

    这就是我测试代码的方式
  • 使用 Node.js API 代码启动服务器
  • 运行一个只取消注释“Alice”行的客户端(在这个客户端控制台上还没有看到任何东西)。
  • 运行仅“Bob”行未注释的第二个客户端。第一个客户端“Alice”的控制台显示:鲍勃说“嗨” (在 Bob 的客户端控制台上还没有看到任何东西)。
  • 运行只有“Carol”行未注释的第三个客户端。 Alice 和 Bob 的控制台显示:卡罗尔说“嗨” (在 Carol 的客户端控制台上还没有看到任何东西)。
  • 停止/杀死 Bob 的客户。 Alice 和 Carol 的控制台显示:鲍勃说“再见” .

  • 所以,代码工作正常:)

    关于python - 为什么python客户端收不到SSE事件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57913442/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com