本文最后更新于:2023年8月15日 晚上
基于k8s部署Session模式Flink集群 在分布式计算领域中,Apache Flink是一个快速、可靠且易于使用的计算引擎。Flink集群是一个分布式系统,它由Flink JobManager和多个Flink TaskManager组成。部署Flink集群时,高可用性是非常重要的一个考虑因素。在本文中,我们将介绍如何基于kubernetes(k8s)部署高可用Session模式的Flink集群,并使用minio作为文件系统(filesystem)。
什么是Session模式 在Flink中,有两种部署模式:Standalone和Session。Standalone模式下,Flink集群是一组独立的进程,它们共享同一个配置文件,并通过Akka通信。Session模式下,Flink集群是动态的、可伸缩的,可以根据需要启动或停止。Session模式下,Flink JobManager和TaskManager进程运行在容器中,可以通过k8s进行动态管理。
Session模式的优点是:
可以根据需要启动或停止Flink集群
可以动态添加或删除TaskManager
可以使用k8s的伸缩功能自动调整Flink集群的大小
可以与k8s的其他资源进行整合,例如存储卷、网络策略等
因此,Session模式是在Kubernetes上部署Flink集群的首选模式。
Flink的filesystem 在 Flink 的处理过程中,数据可能会存储在不同的文件系统中,如本地文件系统、HDFS、S3 等。为了统一处理这些文件系统,Flink 引入了 FileSystem 的概念,它是一个抽象的接口,提供了对不同文件系统的统一访问方式。
fileSystem 的实现类可以通过 Flink 的配置文件指定。Flink 支持多种文件系统,包括本地文件系统、HDFS、S3、Google Cloud Storage 等,因为minio实现了s3协议,所以也可以使用minio来作为文件系统。
基于k8s部署高可用Session模式Flink集群 各组件版本号
组件
版本号
kubernetes
1.15.12
flink
1.15.3
制作镜像 使用minio作为文件系统需要增加s3相关的依赖jar包,所以需要自己制作镜像
Dockerfile:
1 2 3 4 5 6 7 8 9 10 11 12 13 FROM apache/flink:1.15 .3 -scala_2.12 ADD lib/flink-sql-connector-mysql-cdc-2.3.0.jar /opt/flink/lib/ ADD lib/flink-connector-jdbc-1.15.3.jar /opt/flink/lib/ ADD lib/mysql-connector-j-8.0.32.jar /opt/flink/lib/ ADD lib/ojdbc8-21.9.0.0.jar /opt/flink/lib/ RUN mkdir /opt/flink/plugins/s3-fs-presto && cp -f /opt/flink/opt/flink-s3-fs-presto-1.15.3.jar /opt/flink/plugins/s3-fs-presto/
构建镜像:
1 docker build -t sivdead/flink:1.15.3_scala_2.12 -f .\DockerFile .
配置文件(ConfigMap) 配置文件分两个部分,flink-conf.yaml和log4j-console.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 apiVersion: v1 kind: ConfigMap metadata: name: flink-config namespace: szyx-flink labels: app: flink data: flink-conf.yaml: |+ kubernetes.cluster-id: szyx-flink # 所在的命名空间 kubernetes.namespace: szyx-flink jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 2867m parallelism.default: 2 execution.checkpointing.interval: 10s # 文件系统 fs.default-scheme: s3 # minio地址 s3.endpoint: https://minio.k8s.io:9000 # minio的bucket s3.flink.bucket: szyxflink s3.access-key: <minio账号> s3.secret-key: <minio密码> # 状态存储格式 state.backend: rocksdb s3.path.style.access: true blob.storage.directory: /opt/flink/tmp/blob web.upload.dir: /opt/flink/tmp/upload io.tmp.dirs: /opt/flink/tmp # 状态管理 # checkpoint存储地址 state.checkpoints.dir: s3://szyxflink/state/checkpoint # savepoint存储地址 state.savepoints.dir: s3://szyxflink/state/savepoint # checkpoint间隔 execution.checkpointing.interval: 5000 execution.checkpointing.mode: EXACTLY_ONCE # checkpoint保留数量 state.checkpoints.num-retained: 3 # history-server# 监视以下目录中已完成的作业 jobmanager.archive.fs.dir: s3://szyxflink/completed-jobs # 每 10 秒刷新一次 historyserver.archive.fs.refresh-interval: 10000 historyserver.archive.fs.dir: s3://szyxflink/completed-jobs # 高可用 high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: s3://szyxflink/ha # 每6个小时触发一次savepoint kubernetes.operator.periodic.savepoint.interval: 6h kubernetes.operator.savepoint.history.max.age: 24h kubernetes.operator.savepoint.history.max.count: 5 # Restart of unhealthy job deployments kubernetes.operator.cluster.health-check.enabled: true # Restart failed job deployments kubernetes.operator.job.restart.failed: true log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF
添加serviceAccount并授权 在 Kubernetes 上部署 Flink 集群时,需要创建一个 serviceAccount 来授权 Flink 任务在 Kubernetes 集群中执行。ServiceAccount 是 Kubernetes 中一种资源对象,用于授权 Pod 访问 Kubernetes API。当 Flink JobManager 或 TaskManager 启动时,需要使用这个 serviceAccount 来与 Kubernetes API 交互,获取集群资源并进行任务的调度和执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 apiVersion: v1 kind: ServiceAccount metadata: name: flink-service-account namespace: szyx-flink --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: namespace: szyx-flink name: flink rules: - apiGroups: ["" ] resources: ["pods" , "services" ,"configmaps" ] verbs: ["create" , "get" , "list" , "watch" , "delete" ]- apiGroups: ["" ] resources: ["pods/log" ] verbs: ["get" ]- apiGroups: ["batch" ] resources: ["jobs" ] verbs: ["create" , "get" , "list" , "watch" , "delete" ]- apiGroups: ["extensions" ] resources: ["ingresses" ] verbs: ["create" , "get" , "list" , "watch" , "delete" ]--- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: namespace: szyx-flink name: flink-role-binding roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: flink subjects: - kind: ServiceAccount name: flink-service-account namespace: flink
部署JobManager jobManager挂载用pvc
1 2 3 4 5 6 7 8 9 10 11 apiVersion: v1 kind: PersistentVolumeClaim metadata: name: flink-tmp namespace: szyx-flink spec: accessModes: - ReadWriteOnce resources: requests: storage: 40Gi
Deployment:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager namespace: szyx-flink spec: replicas: 1 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager imagePullPolicy: Always image: sivdead/flink:1.15.3_scala_2.12 env: - name: POD_IP valueFrom: fieldRef: apiVersion: v1 fieldPath: status.podIP - name: TZ value: Asia/Shanghai args: ["jobmanager" , "$(POD_IP)" ] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 resources: requests: memory: "8192Mi" cpu: "4" limits: memory: "8192Mi" cpu: "4" volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf - name: tmp-dir mountPath: /opt/flink/tmp securityContext: runAsUser: 9999 serviceAccountName: flink-service-account nodeSelector: zone: mainland tolerations: - key: zone value: mainland effect: NoSchedule volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties name: tmp-dir persistentVolumeClaim: claimName: flink-tmp
Service:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager
Ingress:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 apiVersion: extensions/v1beta1 kind: Ingress metadata: annotations: nginx.ingress.kubernetes.io/proxy-body-size: 300m nginx.ingress.kubernetes.io/rewrite-target: /$1 name: job-manager namespace: szyx-flink spec: rules: - host: flink.k8s.io http: paths: - backend: serviceName: flink-jobmanager servicePort: 8081 path: /flink/(.*)
访问http://flink.k8s.io/flink/能打开flink界面,说明部署完成
部署TaskManager Deployment:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager namespace: szyx-flink spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager imagePullPolicy: Always image: sivdead/flink:1.15.3_scala_2.12 args: ["taskmanager" ] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf/ securityContext: runAsUser: 9999 resources: requests: memory: "8192Mi" cpu: "4" limits: memory: "8192Mi" cpu: "4" nodeSelector: zone: mainland tolerations: - key: zone value: mainland effect: NoSchedule volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties
部署完成后,打开flink页面,查看TaskManages:
测试提交作业
在页面上提交flink自带的示例:WordCount.jar
重启jobmanager,检查作业jar包是否依然存在
运行作业
检查运行结果