当前位置:Java -> Apache Flink
Apache Flink是由Apache软件基金会开发的开源统一流处理和批处理框架。 Apache Flink 的核心是用Java和Scala编写的分布式流式数据流引擎。 Flink以数据并行和管道方式执行任意数据流程序。本教程将逐步向您展示如何将Astra作为Flink计算结果的接收器。
此代码旨在作为演示如何使Apache Flink作业与Astra交互的简单演示。 这里肯定有优化的空间。
下面的图表显示了pyFlink和 AstraDB 的批处理能力以及此集成为数据密集型应用程序开启的全新可能性。
git clone
cd flink-astra-stock-price
pip install -r requirements.txt
单击*获取套件*以下载*安全连接套件*(SCB)。上传该套件到您的编码环境。确保在`my_local_secrets`文件中引用您上传文档的位置。
注意:
永远不要与他人分享您的令牌或套件。 它是关于您的数据库的几个数据集的集合,可用于访问它。
astra.clientid=Bwy...
astra.secret=E4dfE...
astra.scb=secure-connect-test.zip
Open the my_local_secrets.py file and fill in the following details.
client_id="<your-client_id>"
client_secret="<your-client_secret>"
token="<your-token>"
db_keyspace="<your-keyspace>"
secure_bundle_path="<path-to-bundle>/secure-connect-<YOUR_DB_NAME>.zip"
astra_id="<astra-id>"
astra_region="<astra-region>"
api_key="<your-alphavantage-api-key>"
Jupyterlab是Jupyter Notebook的较高级版本。 除传统笔记本之外,它提供许多功能。
如果`pip`不起作用,请尝试`pip3 install juypterlab`。pip install jupyterlab
Jupyter Lab具有比传统笔记本更先进的功能。 与大多数Jupyter笔记本一样,您可以运行笔记本中的每个块。 如果您已将所有字段正确添加到my_local_secrets.py文件中,则笔记本将正确运行。
在笔记本中配置您的PyFlink数据流,以使其从Astra API中获取的数据正确配置。
此演示中用于获取数据的股票代码是“IBM”。 更改第三个块[3]以获取不同股票代码的数据。
我们的演示堆栈使用Python API,但是开发者社区支持多种其他语言的API。
AlphaVantage API可用于获取不同类型的数据。对于此演示,我们使用了流行的Intraday(Time_Series_Intraday)API。
查询目前设置为从API
接下来,PyFlink将从AlphaVantage获取的数据结构化为数据帧,过滤掉任何交易量> 100000的数据字段,并创建一个数据帧用于上传数据到Astra。
笔记本中包含可以直接从笔记本运行的模式创建块。您还可以在Astra门户的CQL控制台中运行模式创建脚本。以下是模式的示例(也可在 `pyFlink_Astra_batch.ipynb` 文件的 [13] 中找到)。
CREATE TABLE if not exists market_stock_data (
date text,
open float,
high float,
low float,
close float,
volume float,
PRIMARY KEY (date)
)
def send_to_rest_api(data):
然后,您的函数对数据对象中的每一行进行迭代,并为每一行向先前构建的URL发出POST请求。此POST请求的数据有效负载是一个JSON字符串,用于描述股票市场数据(日期,开盘价,最高价,最低价,收盘价和交易量)。
创建一个 DataStream 和 Map。在定义了`send_to_rest_api`函数之后,脚本似乎与名为ds的DataStream进行交互。
它使用 `lambda` 函数将 `send_to_rest_api` 函数映射到DataStream上。这意味着对于ds中的每个项目,会调用 `send_to_rest_api` 函数,然后将数据发送到REST API。
ds.map(lambda x: send_to_rest_api(x))
在笔记本中,与send_to_rest_api函数相关联了一些错误处理。
恭喜!您现在应该在您的Astra DB中拥有AlphaVantage数据。
完成上述部分以及上面的部分后,运行示例应用程序并验证Flink与Astra之间的连接。
在 31s 内构建成功
三个可执行任务:2已执行,一个为最新
返回Astra UI以使用CQL控制台。您可以运行此示例查询,以确认从示例应用程序定义的数据已正确加载:
token@cqlsh:example> select * from wordcount ;
word | count
--------+-------
dogs | 1
lazier | 1
least | 1
foxes | 1
jumped | 1
at | 1
are | 1
just | 1
quick | 1
than | 1
fox | 1
our | 1
dog | 2
or | 1
over | 1
brown | 1
lazy | 1
the | 2
(18 rows)
token@cqlsh:example>
推荐阅读: 剑指offer 17.打印从1到最大的n位数
本文链接: Apache Flink