gpt4 book ai didi

python - 如何在 Python 中测试 Server Sent Event API 响应?

转载 作者:行者123 更新时间:2023-12-04 16:45:02 27 4
gpt4 key购买 nike

我目前正在使用以下代码订阅服务器端事件 API:

import requests
import sseclient
import json
import pprint

def get_recent_change():
url = 'https://stream.wikimedia.org/v2/stream/recentchange'
response = requests.get(url, stream=True)
client = sseclient.SSEClient(response)
for event in client.events():
pprint.pprint(json.loads(event.data))
return

get_recent_change()

哪个有效!输出以下数据流(仅显示一个条目):

{'bot': True,
'comment': 'fixing url',
'id': 998191574,
'length': {'new': 664, 'old': 662},
'meta': {'domain': 'commons.wikimedia.org',
'dt': '2017-11-07T07:24:14+00:00',
'id': 'a8e00e00-c38c-11e7-b9f1-b083fecef9b0',
'offset': 526240012,
'partition': 0,
'request_id': 'e9052543-9df8-4ff6-a94e-06629f9aef0e',
'schema_uri': 'mediawiki/recentchange/1',
'topic': 'eqiad.mediawiki.recentchange',
'uri': 'https://commons.wikimedia.org/wiki/File:St_Cuthbert%27s_High_School_-_geograph.org.uk_-_71872.jpg'},
'minor': True,
'namespace': 6,
'parsedcomment': 'fixing url',
'patrolled': True,
'revision': {'new': 266424348, 'old': 155939348},
'server_name': 'commons.wikimedia.org',
'server_script_path': '/w',
'server_url': 'https://commons.wikimedia.org',
'timestamp': 1510039454,
'title': "File:St Cuthbert's High School - geograph.org.uk - 71872.jpg",
'type': 'edit',
'user': 'HiW-Bot',
'wiki': 'commonswiki'}

如何为 get_recent_change() 编写测试?我尝试的一切都挂起,因为流永远不会关闭......

有什么想法吗?

最佳答案

我想出了这种方式。我知道这不是最好的解决方案,但它确实有效!!在这种情况下,我只下载前 50 个有效载荷这是我的代码:

def get_recent_change(self) -> list:
# se inicializa un socket
r = requests.session()
# se establece la conexión
responds = r.get(self.__url, stream=True)
try:
# se capturan los eventos
client = sseclient.SSEClient(responds)
print('Connection established')
except Exception as e:
print(e)
# limitador del stram
cont = 0
payloads = []
try:
print('Downloading info')
for event in client.events():
#el primer evento no sirve
if cont==0:
pass
# captura muchos payloads en un solo evento, tocó separarlos
for st in event.data.split('\n'):
# Progress bar of the poor
print('-', end='',flush=True)
#se guardan en una lista
payloads.append(json.loads(st))
if cont >= 50:
break
cont +=1
except Exception as e:
print(e)
return payloads

关于python - 如何在 Python 中测试 Server Sent Event API 响应?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47152271/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com