当前位置:Java -> Apache Flink

Apache Flink

Apache Flink是由Apache软件基金会开发的开源统一流处理和批处理框架。 Apache Flink 的核心是用Java和Scala编写的分布式流式数据流引擎。 Flink以数据并行和管道方式执行任意数据流程序。本教程将逐步向您展示如何将Astra作为Flink计算结果的接收器。

此代码旨在作为演示如何使Apache Flink作业与Astra交互的简单演示。 这里肯定有优化的空间。

下面的图表显示了pyFlink和 AstraDB 的批处理能力以及此集成为数据密集型应用程序开启的全新可能性。

先决条件

  • 您的系统中应安装了 GradleJava
  • Python v3.7 或更高版本

设置Flink

  1. 打开一个终端,使用以下命令克隆 GitHub Flink 代码库git clone
  2. 切换到`flink-astra-stock-price`文件夹并安装requirements.txt文件中列出的依赖项。cd flink-astra-stock-price
  3. 为此插件安装所需的软件包运行以下命令。pip install -r requirements.txt
  4. 打开AlphaVantage API。选择*Get Free API Key*。
  5. 填写表格以创建您的密钥,并单击*GET FREE API KEY*。
  6. 通过电子邮件接收凭证并将它们添加到“my_local_secrets.py”文档中。

设置您的Astra数据库

  1. 创建Astra数据库。 将数据库命名为“flink”,键空间为`example`。
  2. 将新创建的数据库中的键空间名称添加到 `my_local_secrets.py` 中。
  3. 选择*生成令牌*。下载令牌以连接到您的数据库。
    1. 将`client_id`,`client_secret`和`client_token`添加到`my_local_secrets.py`文件的相应字段中。
    2. 单击*Close*。
  4. 单击*获取套件*以下载*安全连接套件*(SCB)。上传该套件到您的编码环境。确保在`my_local_secrets`文件中引用您上传文档的位置。

注意:

永远不要与他人分享您的令牌或套件。 它是关于您的数据库的几个数据集的集合,可用于访问它。

  1. 将SCB移动到GitHub目录中的app/src/main/resources(无需解压文件)。
  2. 创建名为app.properties的属性文件,并将其放在app/src/main/resources/中。
  3. 添加指定您的Astra客户ID,Astra秘钥和SCB文件名的属性。 这些应该映射到"astra.clientid","astra.secret"和"astra. scab"属性。您的app.properties文件应该类似于这样:
astra.clientid=Bwy...

astra.secret=E4dfE...

astra.scb=secure-connect-test.zip

Open the my_local_secrets.py file and fill in the following details.

client_id="<your-client_id>"

client_secret="<your-client_secret>"

token="<your-token>"

db_keyspace="<your-keyspace>"

secure_bundle_path="<path-to-bundle>/secure-connect-<YOUR_DB_NAME>.zip"

astra_id="<astra-id>"

astra_region="<astra-region>"

api_key="<your-alphavantage-api-key>"


安装Jupyter Lab

Jupyterlab是Jupyter Notebook的较高级版本。 除传统笔记本之外,它提供许多功能。

  1. 使用pip在您的计算机上安装Jupyter。 运行以下命令来执行安装。
  2. 如果`pip`不起作用,请尝试`pip3 install juypterlab`。pip install jupyterlab

  3. 在终端中输入 `jupyter-lab` 以打开一个窗口,其中列出了您工作目录的内容。 从克隆的目录中,找到要执行步骤的笔记本。
  4. 如果它没有自动启动,您可以通过单击终端中的URL来导航到JupyterLabs服务器。
  5. 从您的工作目录运行`jupyter`命令后,您可以在浏览器中看到项目树,并导航到文件。确保在启动任何编码之前按照下一步配置了秘密。
  6. 打开`local_secrets.py`文件,并填写从Astra Portal设置数据库详细信息后提供/提取的详细信息。
  7. 导航到笔记本

运行Jupyter Notebook

Jupyter Lab具有比传统笔记本更先进的功能。 与大多数Jupyter笔记本一样,您可以运行笔记本中的每个块。 如果您已将所有字段正确添加到my_local_secrets.py文件中,则笔记本将正确运行。

在笔记本中配置您的PyFlink数据流,以使其从Astra API中获取的数据正确配置。

此演示中用于获取数据的股票代码是“IBM”。 更改第三个块[3]以获取不同股票代码的数据。

我们的演示堆栈使用Python API,但是开发者社区支持多种其他语言的API。

定义API查询

AlphaVantage API可用于获取不同类型的数据。对于此演示,我们使用了流行的Intraday(Time_Series_Intraday)API。

查询目前设置为从API

  • 它在5分钟内的初始值,
  • 在5分钟内的最大值,
  • 在5分钟内的最低值,
  • 5分钟结束时的收盘量,以及
  • 该股票在5分钟内的交易量。

接下来,PyFlink将从AlphaVantage获取的数据结构化为数据帧,过滤掉任何交易量> 100000的数据字段,并创建一个数据帧用于上传数据到Astra。

写入Astra

笔记本中包含可以直接从笔记本运行的模式创建块。您还可以在Astra门户的CQL控制台中运行模式创建脚本。以下是模式的示例(也可在 `pyFlink_Astra_batch.ipynb` 文件的 [13] 中找到)。

CREATE TABLE if not exists market_stock_data (

    date text,

    open float,

    high float,

    low float,

    close float,

    volume float,

    PRIMARY KEY (date)

)


  1. 使用AstraDB RestAPI在 `pyflink_Astra_batch.ipynb` 文件中将数据插入Astra。已定义了一个名为send_to_rest_api的函数,该函数接受名为数据的单个参数。def send_to_rest_api(data):
  2. 使用格式化字符串为DataStax Astra REST API端点创建URL。端点URL与DataStax Astra中的`my_local_secrets.py`文件中指定的特定keyspace和table相关联。

然后,您的函数对数据对象中的每一行进行迭代,并为每一行向先前构建的URL发出POST请求。此POST请求的数据有效负载是一个JSON字符串,用于描述股票市场数据(日期,开盘价,最高价,最低价,收盘价和交易量)。

Datastream and Map

创建一个 DataStream 和 Map。在定义了`send_to_rest_api`函数之后,脚本似乎与名为ds的DataStream进行交互。

它使用 `lambda` 函数将 `send_to_rest_api` 函数映射到DataStream上。这意味着对于ds中的每个项目,会调用 `send_to_rest_api` 函数,然后将数据发送到REST API。

ds.map(lambda x: send_to_rest_api(x))

在笔记本中,与send_to_rest_api函数相关联了一些错误处理。

恭喜!您现在应该在您的Astra DB中拥有AlphaVantage数据。

测试和验证

完成上述部分以及上面的部分后,运行示例应用程序并验证Flink与Astra之间的连接。

  1. 在您的 `flink-astra` 克隆的GitHub目录中,运行 `./gradlew run`
  2. 验证应用程序是否运行并正常退出。如果成功,将出现以下消息:

在 31s 内构建成功

三个可执行任务:2已执行,一个为最新

  1. 返回Astra UI以使用CQL控制台。您可以运行此示例查询,以确认从示例应用程序定义的数据已正确加载:

token@cqlsh:example> select * from wordcount ;

word | count

--------+-------

dogs | 1

lazier | 1

least | 1

foxes | 1

jumped | 1

at | 1

are | 1

just | 1

quick | 1

than | 1

fox | 1

our | 1

dog | 2

or | 1

over | 1

brown | 1

lazy | 1

the | 2

(18 rows)

token@cqlsh:example>


推荐阅读: 剑指offer 17.打印从1到最大的n位数

本文链接: Apache Flink