1、实时读取和处理Kafka中的数据。
利用faust.App创建Faust应用程序,并配置应用程序名称,Kafkabroker和序列模式。
接着,我们创建了一个主题,它与Kafka中的主题相对应。
Faust使用Python3.6+异步语法async,定义异步函数greet,并将其注册为Faust应用程序的agent。该函数接收实时数据集greetings,并异步输出每个数据。
import faust app = faust.App( 'hello-world', broker='kafka://localhost:9092', value_serializer='raw', ) greetings_topic = app.topic('greetings') @app.agent(greetings_topic) async def greet(greetings): async for greeting in greetings: print(greeting)
$ faust -A hello_world worker -l info
2、充分利用Python的类型提示,可以轻松定义数据模型。
import faust class Greeting(faust.Record): from_name: str to_name: str app = faust.App('hello-app', broker='kafka://localhost') topic = app.topic('hello-topic', value_type=Greeting) @app.agent(topic) async def hello(greetings): async for greeting in greetings: print(f'Hello from {greeting.from_name} to {greeting.to_name}') @app.timer(interval=1.0) async def example_sender(app): await hello.send( value=Greeting(from_name='Faust', to_name='you'), ) if __name__ == '__main__': app.main()
神龙|纯净稳定代理IP免费测试>>>>>>>>天启|企业级代理IP免费测试>>>>>>>>IPIPGO|全球住宅代理IP免费测试