gpt4 book ai didi

python - RxPy - 将实时 Twitter 流转换为 Rx Observable?

转载 作者:行者123 更新时间:2023-12-01 03:34:41 25 4
gpt4 key购买 nike

我跟着这个great tutorial使用 tweepy 在 Python 中利用实时 Twitter 流。这将实时打印提及 RxJava、RxPy、RxScala 或 ReactiveX 的推文。

from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from rx import Observable, Observer

#Variables that contains the user credentials to access Twitter API
access_token = "CONFIDENTIAL"
access_token_secret = "CONFIDENTIAL"
consumer_key = "CONFIDENTIAL"
consumer_secret = "CONFIDENTIAL"


#This is a basic listener that just prints received tweets to stdout.
class TweetObserver(StreamListener):

def on_data(self, data):
print(data)
return True

def on_error(self, status):
print(status)



if __name__ == '__main__':

#This handles Twitter authetification and the connection to Twitter Streaming API
l = TweetObserver()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, l)

#This line filter Twitter Streams to capture data by the keywords: 'python', 'javascript', 'ruby'
stream.filter(track=['rxjava','rxpy','reactivex','rxscala'])

这是成为ReactiveX的完美候选人。可通过RxPy观察。但我到底如何将其变成热门源 Observable 呢?我似乎无法在任何地方找到有关如何执行 Observable.create()...

的文档

最佳答案

我不久前就发现了这一点。您必须定义一个函数来操作传递的 Observer 参数。然后将其传递给 Observable.create()。

from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json
from rx import Observable

# Variables that contains the user credentials to access Twitter API
access_token = "PUT YOURS HERE"
access_token_secret = "PUT YOURS HERE"
consumer_key = "PUT YOURS HERE"
consumer_secret = "PUT YOURS HERE"


def tweets_for(topics):
def observe_tweets(observer):
class TweetListener(StreamListener):
def on_data(self, data):
observer.on_next(data)
return True

def on_error(self, status):
observer.on_error(status)

# This handles Twitter authetification and the connection to Twitter Streaming API
l = TweetListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, l)
stream.filter(track=topics)

return Observable.create(observe_tweets).share()


topics = ['Britain', 'France']

tweets_for(topics) \
.map(lambda d: json.loads(d)) \
.subscribe(on_next=lambda s: print(s), on_error=lambda e: print(e))

关于python - RxPy - 将实时 Twitter 流转换为 Rx Observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40499153/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com