python Faust库如何使用

630次阅读
没有评论

python

1、实时读取和处理Kafka中的数据。

利用faust.App创建Faust应用程序,并配置应用程序名称,Kafkabroker和序列模式。

接着,我们创建了一个主题,它与Kafka中的主题相对应。

Faust使用Python3.6+异步语法async,定义异步函数greet,并将其注册为Faust应用程序的agent。该函数接收实时数据集greetings,并异步输出每个数据。

import faust
 
app = faust.App(
    'hello-world',
    broker='kafka://localhost:9092',
    value_serializer='raw',
)
 
greetings_topic = app.topic('greetings')
 
@app.agent(greetings_topic)
async def greet(greetings):
    async for greeting in greetings:
        print(greeting)
$ faust -A hello_world worker -l info

2、充分利用Python的类型提示,可以轻松定义数据模型。

import faust
 
class Greeting(faust.Record):
    from_name: str
    to_name: str
 
app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)
 
@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')
 
@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )
 
if __name__ == '__main__':
    app.main()
神龙|纯净稳定代理IP免费测试>>>>>>>>天启|企业级代理IP免费测试>>>>>>>>IPIPGO|全球住宅代理IP免费测试

相关文章:

版权声明:wuyou2021-04-26发表,共计1147字。
新手QQ群:570568346,欢迎进群讨论 Python51学习