linux 部署 flink 1.15.1 并提交作业

发布时间:2022-08-13 13:33:57 作者:yexindonglai@163.com 阅读(2537)

下载 1.15.1

https://flink.apache.org/downloads.html#apache-flink-1151

部署模式分类

  • 会话模式
  • 应用模式
  • 单作业模式
1、会话模式

先启动一个集群,保持一个会话,然后通过客户端提交作业,所有作业都在一个会话执行;

会话模式适合规模小、执行时间短的大量作业;

2、应用模式

前两种模式应用代码都是在客户端运行,然后由客户端提交给jobmanager的,这种方式的弊端是:需要占用大量网络带宽,去下载依赖和把二进制数据发送给jobmanager,将会加重客户端资源消耗。
所以Application Mode的解决办法是:不需要客户端,直接把应用提交到jobmanager上运行,这意味着要为每个提交的应用单独启动一个jobmanager,也就是创建一个集群,

jobmanager执行完自己的应用将会关闭

应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由 JobManager 执行应用程序的,即使应用包含了多个作业,也只创建一个集群。此模式用的比较少,

3、单作业模式

为每个作业启动一个集群,只要客户端提交了一个作业,就为这个作业启动一个单独的集群,这个集群只为这个作业提供服务;其

一、独立会话模式(Standalone)-部署

flink只支持linux部署

1、解压

  1. tar -zvxf flink-1.15.1-bin-scala_2.12.tgz

2、修改配置文件

  1. vim conf/flink-conf.yaml
  2. # 修改以下内容
  3. jobmanager.rpc.address: 192.168.31.250 # 选择当前主机的ip地址,如果是云服务器,使用外网ip
  4. # JobManager将绑定到的主机接口,默认值为 localhost 禁止外部访问,设为0.0.0.0表示允许外部访问,设置错误的话 Available Task Slots 会显示0
  5. jobmanager.bind-host: 0.0.0.0
  6. # 任务插槽数量,相当于使用多少个线程来执行流
  7. taskmanager.numberOfTaskSlots: 2
  8. parallelism.default: 1
  9. web.submit.enable: true
  10. # 指定TaskManager主机的地址,单机部署的话,用localhost即可
  11. taskmanager.host: 192.168.31.250
  12. # web前端展示的端口,自己设置
  13. rest.port: 8081
  14. # 客户端应该用来连接到服务器的地址。注意:仅当高可用性配置为 NONE 时才考虑此选项
  15. rest.address: 192.168.31.250
  16. # 允许外部ip访问的地址,默认情况下是localhost,只能内部访问,改为0.0.0.0允许所有外部ip访问
  17. rest.bind-address: 0.0.0.0

3、修改master文件,

  1. vim conf/masters
  2. # 填写主节点的ip地址,如果是云服务器,使用外网ip
  3. 192.168.31.250:8081

4、修改 workers 文件

  1. vim conf/workers
  2. # 添加 taskManager 节点的ip地址列表,如果是单节点,只填写主节点ip地址即可
  3. 192.168.31.250
  4. 192.168.31.251
  5. 192.168.31.252

5、、启动

  1. bin/start-cluster.sh

启动成功后,命令行会显示如下信息

  1. [root@dev-server bin]# ./start-cluster.sh
  2. Starting cluster. # 启动集群
  3. Starting standalonesession daemon on host dev-server. # 启动会话模式的 作业调度器 jobmanager
  4. Starting taskexecutor daemon on host dev-server. # 启动任务管理器

通过jps命令可以看到已经启动的flink

  1. [root@dev-server bin]# jps
  2. 3010991 TaskManagerRunner # 任务调度器 taskManager
  3. 3010438 StandaloneSessionClusterEntrypoint # 会话模式的节点
  4. 3023395 Jps

说明:

  1. JobManager 的启动代码:standalonesession,实现类是:StandaloneSessionClusterEntrypoint
  2. TaskManager 的启动代码:taskexecutor,实现类是:TaskManagerRunner

6、、访问ui界面

  1. http://192.168.31.250:8081

7、、停止flink

  1. bin/stop-cluster.sh

二、提交作业

1、编写作业代码
新建maven项目,pom.xml 加入flink的依赖

  1. <properties>
  2. <java.version>1.8</java.version>
  3. <scala-binary-version>2.12</scala-binary-version>
  4. <flink-version>1.13.0</flink-version>
  5. <slf4j-version>1.7.30</slf4j-version>
  6. </properties>
  7. <dependencies>
  8. <!-- flink 依赖-->
  9. <dependency>
  10. <groupId>org.apache.flink</groupId>
  11. <artifactId>flink-java</artifactId>
  12. <version>${flink-version}</version>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.apache.flink</groupId>
  16. <artifactId>flink-streaming-java_${scala-binary-version}</artifactId>
  17. <version>${flink-version}</version>
  18. </dependency>
  19. <!-- flink 客户端,主要做一些管理相关的工作,如果不需要,就不需要导入此依赖-->
  20. <dependency>
  21. <groupId>org.apache.flink</groupId>
  22. <artifactId>flink-clients_${scala-binary-version}</artifactId>
  23. <version>${flink-version}</version>
  24. </dependency>
  25. <!-- 日志相关依赖-->
  26. <dependency>
  27. <groupId>org.slf4j</groupId>
  28. <artifactId>slf4j-api</artifactId>
  29. <version>${slf4j-version}</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.slf4j</groupId>
  33. <artifactId>slf4j-log4j12</artifactId>
  34. <version>${slf4j-version}</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.apache.logging.log4j</groupId>
  38. <artifactId>log4j-to-slf4j</artifactId>
  39. <version>2.14.0</version>
  40. </dependency>
  41. </dependencies>

2、编写java代码

  1. package com.demo;/**
  2. * @author yexd
  3. */
  4. import org.apache.flink.api.common.typeinfo.Types;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.KeyedStream;
  8. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.util.Collector;
  11. /**
  12. * @title: 无界流处理
  13. * @Author yexd
  14. * @Date: 2022/8/7 20:10
  15. * @Version 1.0
  16. */
  17. public class UnboundedStreamWord {
  18. static String ip = "192.168.31.250";
  19. static int port = 9879;
  20. /**
  21. * 先将文件中的每一行进行分词,然后统计每个单词出现的次数
  22. * @param args
  23. * @throws Exception
  24. */
  25. public static void main(String[] args) throws Exception {
  26. // 创建执行环境
  27. StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
  28. // 读取网络流,在linux系统输入命令 : nc -lk 8888 后,就可以进行通讯了,-lk表示保持当前的连接并持续监听8888端口
  29. DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip,port);
  30. // 将每行数据根据空格切割后进行分词,转换成二元组, FlatMapOperator<输入的数据类型, 输出的数据类型>
  31. SingleOutputStreamOperator<Tuple2<String, Long>> operator = stringDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
  32. // 将每行进行切割
  33. String[] words = line.split(" ");
  34. for (String word : words) {
  35. // 将每个单词转换成二元组进行输出,其中第一个 word 表示单词本身, 1L表示每个单词出现的次数,后面会用这个次数来进行统计单词出现的总数
  36. out.collect(Tuple2.of(word, 1L));
  37. }
  38. });
  39. // 返回分词后的结果,FlatMapOperator<输入的数据类型, 输出的数据类型>
  40. SingleOutputStreamOperator<Tuple2<String, Long>> returns = operator.returns(Types.TUPLE(Types.STRING, Types.LONG));
  41. // 按照分词进行分组,keyBy 参数中的 f0 表示根据第几个字段进行分组(从0开始), 很明显,Tuple2的第一个字段是String类型,也就是刚刚分好词后的单词
  42. KeyedStream<Tuple2<String, Long>, Object> tuple2UnsortedGrouping = returns.keyBy(data -> data.f0);
  43. // 分组内进行聚合统计,sum 中的参数1 表示根据第几个属性进行统计,Tuple2<String, Long> 很明显第二个属性是Long,在上面我们将这个属性都置为1了,所以会进行统计
  44. SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2UnsortedGrouping.sum(1);
  45. // 打印
  46. sum.print();
  47. // 启动执行
  48. executionEnvironment.execute();
  49. /**
  50. 打印结果:
  51. 4> (123,1)
  52. 5> (hello,1)
  53. 15> (456,1)
  54. 5> (hello,2)
  55. 4> (123,2)
  56. 5> (hello,3)
  57. 说明: 大于号前面的数字表示 线程的编号,表示使用不同的线程进行处理,也就是并行流
  58. */
  59. }
  60. }

3、打包,通过以下命令将项目打成 jar 包

  1. maven clean package

3、添加作业
在页面中选择 Submit New Job -> Add New ,

选择刚刚打好的jar包

上传后点击jar的名称,有些信息需要填写一下

说明:

  • Entry Class : jar包中 main 方法所在类的全类名
  • Parallelism : 并行度,就是用多线程去执行作业,调成多少就用多少个线程执行作业
  • Program Arguments : 传入main 方法的参数,多个参数用空格隔开
  • Savepoint Path :保存点路径,比如你作业执行到一半,但是flink服务器需要重启,就会先暂停作业,然后将执行到一半的作业保存起来,待重启后继续执行,这里配置就是保存的路径;如果不需要保存,为空就行

4、提交之前的改动
因为在java代码里面用的无界流处理,也就是说,数据是通过 socket 网络传输的,如果不先启动监听的话,现在盲目提交就会导致报错,而我的代码里监听了 192.168.31.250 的 9879端口, 所以需要在 192.168.31.250 的服务器上输入以下命令来监听 9879 的端口

  1. # -lk表示保持当前的连接并持续监听9879端口
  2. nc -lk 9879

5、提交

以下是我的配置,然后点击 Submit 就可以提交了

提交后 一次点击左边的菜单栏 Jobs -> Running Jobs ,就可以可以看到刚刚提交的任务了,点进去看看

说明:

  • 绿色的RUNNING 表示正在运行中,如果是红色的字体,就表示有错误
  • RUNNING旁边绿色的 2 表示并行度,表示有2个线程执行这个作业
  • 底部表格展示的是运行的时长、数据流大小、任务数量等信息
  • Cancel Job : 可通过此按钮来停止作业

6、往flink发送消息
刚刚启动了 linux 监听了 9879 端口,发送了2条信息

然后依次点击 TaskManager -> 任务id

最后点击 Stout 就可以看到输入的内容了

关键字flink