基于k8s部署Session模式Flink集群

本文最后更新于:2023年8月15日 晚上

基于k8s部署Session模式Flink集群

在分布式计算领域中,Apache Flink是一个快速、可靠且易于使用的计算引擎。Flink集群是一个分布式系统,它由Flink JobManager和多个Flink TaskManager组成。部署Flink集群时,高可用性是非常重要的一个考虑因素。在本文中,我们将介绍如何基于kubernetes(k8s)部署高可用Session模式的Flink集群,并使用minio作为文件系统(filesystem)。

什么是Session模式

在Flink中,有两种部署模式:Standalone和Session。Standalone模式下,Flink集群是一组独立的进程,它们共享同一个配置文件,并通过Akka通信。Session模式下,Flink集群是动态的、可伸缩的,可以根据需要启动或停止。Session模式下,Flink JobManager和TaskManager进程运行在容器中,可以通过k8s进行动态管理。

Session模式的优点是:

  • 可以根据需要启动或停止Flink集群
  • 可以动态添加或删除TaskManager
  • 可以使用k8s的伸缩功能自动调整Flink集群的大小
  • 可以与k8s的其他资源进行整合,例如存储卷、网络策略等

因此,Session模式是在Kubernetes上部署Flink集群的首选模式。

Flink的filesystem

在 Flink 的处理过程中,数据可能会存储在不同的文件系统中,如本地文件系统、HDFS、S3 等。为了统一处理这些文件系统,Flink 引入了 FileSystem 的概念,它是一个抽象的接口,提供了对不同文件系统的统一访问方式。

fileSystem 的实现类可以通过 Flink 的配置文件指定。Flink 支持多种文件系统,包括本地文件系统、HDFS、S3、Google Cloud Storage 等,因为minio实现了s3协议,所以也可以使用minio来作为文件系统。

基于k8s部署高可用Session模式Flink集群

各组件版本号

组件 版本号
kubernetes 1.15.12
flink 1.15.3

制作镜像

使用minio作为文件系统需要增加s3相关的依赖jar包,所以需要自己制作镜像

Dockerfile:

1
2
3
4
5
6
7
8
9
10
11
12
13
FROM apache/flink:1.15.3-scala_2.12

# 需要用到的jar包
# flink-cdc
ADD lib/flink-sql-connector-mysql-cdc-2.3.0.jar /opt/flink/lib/
# jdbc连接器
ADD lib/flink-connector-jdbc-1.15.3.jar /opt/flink/lib/
# mysql驱动
ADD lib/mysql-connector-j-8.0.32.jar /opt/flink/lib/
# oracle驱动
ADD lib/ojdbc8-21.9.0.0.jar /opt/flink/lib/
# 文件系统插件需要放到插件目录,按规范放置
RUN mkdir /opt/flink/plugins/s3-fs-presto && cp -f /opt/flink/opt/flink-s3-fs-presto-1.15.3.jar /opt/flink/plugins/s3-fs-presto/

构建镜像:

1
docker build -t sivdead/flink:1.15.3_scala_2.12 -f .\DockerFile .

配置文件(ConfigMap)

配置文件分两个部分,flink-conf.yamllog4j-console.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
namespace: szyx-flink
labels:
app: flink
data:
flink-conf.yaml: |+
kubernetes.cluster-id: szyx-flink
# 所在的命名空间
kubernetes.namespace: szyx-flink
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 2867m
parallelism.default: 2
execution.checkpointing.interval: 10s
# 文件系统
fs.default-scheme: s3
# minio地址
s3.endpoint: https://minio.k8s.io:9000
# minio的bucket
s3.flink.bucket: szyxflink
s3.access-key: <minio账号>
s3.secret-key: <minio密码>
# 状态存储格式
state.backend: rocksdb
s3.path.style.access: true
blob.storage.directory: /opt/flink/tmp/blob
web.upload.dir: /opt/flink/tmp/upload
io.tmp.dirs: /opt/flink/tmp
# 状态管理
# checkpoint存储地址
state.checkpoints.dir: s3://szyxflink/state/checkpoint
# savepoint存储地址
state.savepoints.dir: s3://szyxflink/state/savepoint
# checkpoint间隔
execution.checkpointing.interval: 5000
execution.checkpointing.mode: EXACTLY_ONCE
# checkpoint保留数量
state.checkpoints.num-retained: 3
# history-server# 监视以下目录中已完成的作业
jobmanager.archive.fs.dir: s3://szyxflink/completed-jobs
# 每 10 秒刷新一次
historyserver.archive.fs.refresh-interval: 10000
historyserver.archive.fs.dir: s3://szyxflink/completed-jobs
# 高可用
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://szyxflink/ha
# 每6个小时触发一次savepoint
kubernetes.operator.periodic.savepoint.interval: 6h
kubernetes.operator.savepoint.history.max.age: 24h
kubernetes.operator.savepoint.history.max.count: 5
# Restart of unhealthy job deployments
kubernetes.operator.cluster.health-check.enabled: true
# Restart failed job deployments
kubernetes.operator.job.restart.failed: true
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

添加serviceAccount并授权

在 Kubernetes 上部署 Flink 集群时,需要创建一个 serviceAccount 来授权 Flink 任务在 Kubernetes 集群中执行。ServiceAccount 是 Kubernetes 中一种资源对象,用于授权 Pod 访问 Kubernetes API。当 Flink JobManager 或 TaskManager 启动时,需要使用这个 serviceAccount 来与 Kubernetes API 交互,获取集群资源并进行任务的调度和执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
apiVersion: v1
kind: ServiceAccount
metadata:
name: flink-service-account
namespace: szyx-flink

---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: szyx-flink
name: flink
rules:
- apiGroups: [""]
resources: ["pods", "services","configmaps"]
verbs: ["create", "get", "list", "watch", "delete"]
- apiGroups: [""]
resources: ["pods/log"]
verbs: ["get"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["create", "get", "list", "watch", "delete"]
- apiGroups: ["extensions"]
resources: ["ingresses"]
verbs: ["create", "get", "list", "watch", "delete"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
namespace: szyx-flink
name: flink-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: flink
subjects:
- kind: ServiceAccount
name: flink-service-account
namespace: flink

部署JobManager

jobManager挂载用pvc

1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: flink-tmp
namespace: szyx-flink
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 40Gi

Deployment:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: szyx-flink
spec:
replicas: 1 # Set the value to greater than 1 to start standby JobManagers
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
imagePullPolicy: Always
image: sivdead/flink:1.15.3_scala_2.12
env:
# 注入POD的ip到容器内
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
# 时区
- name: TZ
value: Asia/Shanghai
# The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
args: ["jobmanager", "$(POD_IP)"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
resources:
requests:
memory: "8192Mi"
cpu: "4"
limits:
memory: "8192Mi"
cpu: "4"
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: tmp-dir
mountPath: /opt/flink/tmp
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
# 节点选择器
nodeSelector:
zone: mainland
# 节点容忍
tolerations:
- key: zone
value: mainland
effect: NoSchedule
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
name: tmp-dir
persistentVolumeClaim:
claimName: flink-tmp

Service:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager

Ingress:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
annotations:
# 因为有可能需要上传jar包,所以需要设置大一些
nginx.ingress.kubernetes.io/proxy-body-size: 300m
nginx.ingress.kubernetes.io/rewrite-target: /$1
name: job-manager
namespace: szyx-flink
spec:
rules:
- host: flink.k8s.io
http:
paths:
- backend:
serviceName: flink-jobmanager
servicePort: 8081
path: /flink/(.*)

访问http://flink.k8s.io/flink/能打开flink界面,说明部署完成

image-20230314142121624

部署TaskManager

Deployment:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: szyx-flink
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
imagePullPolicy: Always
image: sivdead/flink:1.15.3_scala_2.12
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
resources:
requests:
memory: "8192Mi"
cpu: "4"
limits:
memory: "8192Mi"
cpu: "4"
# 节点选择器
nodeSelector:
zone: mainland
# 节点容忍
tolerations:
- key: zone
value: mainland
effect: NoSchedule
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties

部署完成后,打开flink页面,查看TaskManages:

image-20230314161322637

测试提交作业

  1. 在页面上提交flink自带的示例:WordCount.jar

image-20230314161658534

  1. 重启jobmanager,检查作业jar包是否依然存在

  2. 运行作业

    image-20230314161801536

  3. 检查运行结果

    image-20230314161833752

image-20230314161857713


基于k8s部署Session模式Flink集群
http://example.com/2023/03/14/flink/基于k8s部署Session模式Flink集群/
作者
敬文
发布于
2023年3月14日
许可协议