1、创建Headless Service服务

Headless 服务类型并不分配容器云虚拟 IP,而是直接暴露所属 Pod 的 DNS 记录。没有默认负载均衡器,可直接访问 Pod IP 地址。因此,当我们需要与集群内真实的 Pod IP 地址进行直接交互时,Headless 服务就很有用。 其中Service的关键配置如下:clusterIP: None,不让其获取clusterIP , DNS解析的时候直接走pod。

---

kind: Service

apiVersion: v1

metadata:

name: ecc-spark-service

namespace: ecc-spark-cluster

spec:

clusterIP: None

ports:

- port: 7077

protocol: TCP

targetPort: 7077

name: spark

- port: 10000

protocol: TCP

targetPort: 10000

name: thrift-server-tcp

- port: 8080

targetPort: 8080

name: http

- port: 45970

protocol: TCP

targetPort: 45970

name: thrift-server-driver-tcp

- port: 45980

protocol: TCP

targetPort: 45980

name: thrift-server-blockmanager-tcp

- port: 4040

protocol: TCP

targetPort: 4040

name: thrift-server-tasks-tcp

selector:

app: ecc-spark-service

EOF

Service的完全域名: ecc-spark-service.ecc-spark-cluster.svc.cluster.local headless service的完全域名: headless-service.ecc-spark-cluster.svc.cluster.local 在容器里面ping 完全域名, service解析出的地址是clusterIP,headless service 解析出来的地址是 pod IP。

2、构建spark集群

2.1 、创建spark master

spark master分为两个部分,一个是类型为ReplicationController的主体,命名为ecc-spark-master.yaml,另一部分为一个service,暴露master的7077端口给slave使用。

#如下是把thriftserver部署在master节点,则需要暴露thriftserver端口、driver端口、

#blockmanager端口服务,以提供worker节点executor与driver交互.

cat >ecc-spark-master.yaml <

kind: Deployment

apiVersion: apps/v1

metadata:

name: ecc-spark-master

namespace: ecc-spark-cluster

labels:

app: ecc-spark-master

spec:

replicas: 1

selector:

matchLabels:

app: ecc-spark-master

template:

metadata:

labels:

app: ecc-spark-master

spec:

serviceAccountName: spark-cdp

securityContext: {}

dnsPolicy: ClusterFirst

hostname: ecc-spark-master

containers:

- name: ecc-spark-master

image: spark:3.4.1

imagePullPolicy: IfNotPresent

command: ["/bin/sh"]

args: ["-c","sh /opt/spark/sbin/start-master.sh && tail -f /opt/spark/logs/spark--org.apache.spark.deploy.master.Master-1-*"]

ports:

- containerPort: 7077

- containerPort: 8080

volumeMounts:

- mountPath: /opt/usrjars/

name: ecc-spark-pvc

livenessProbe:

failureThreshold: 9

initialDelaySeconds: 2

periodSeconds: 15

successThreshold: 1

tcpSocket:

port: 8080

timeoutSeconds: 10

resources:

requests:

cpu: "2"

memory: "6Gi"

limits:

cpu: "2"

memory: "6Gi"

- env:

- SPARK_LOCAL_DIRS

value: "/odsdata/sparkdirs/"

volumes:

- name: ecc-spark-pvc

persistentVolumeClaim:

claimName: ecc-spark-pvc-static

2.2、创建spark worker

在启动spark worker脚本中需要传入master的地址,在容器云kubernetes dns且设置了service的缘故,可以通过ecc-spark-master.ecc-spark-cluster.svc.cluster.local:7077访问。

cat >ecc-spark-worker.yaml <

kind: Deployment

apiVersion: apps/v1

metadata:

name: ecc-spark-worker

namespace: ecc-spark-cluster

labels:

app: ecc-spark-worker

spec:

replicas: 1

selector:

matchLabels:

app: ecc-spark-worker

template:

metadata:

labels:

app: ecc-spark-worker

spec:

serviceAccountName: spark-cdp

securityContext: {}

dnsPolicy: ClusterFirst

hostname: ecc-spark-worker

containers:

- name: ecc-spark-worker

image: spark:3.4.1

imagePullPolicy: IfNotPresent

command: ["/bin/sh"]

args: ["-c","sh /opt/spark/sbin/start-worker.sh spark://ecc-spark-master.ecc-spark-cluster.svc.cluster.local:7077;tail -f /opt/spark/logs/spark--org.apache.spark.deploy.worker.Worker*"]

ports:

- containerPort: 8081

volumeMounts:

- mountPath: /opt/usrjars/

name: ecc-spark-pvc

resources:

requests:

cpu: "2"

memory: "2Gi"

limits:

cpu: "2"

memory: "4Gi"

- env:

- SPARK_LOCAL_DIRS

value: "/odsdata/sparkdirs/"

volumes:

- name: ecc-spark-pvc

persistentVolumeClaim:

claimName: ecc-spark-pvc-static

EOF

2.3 构建pyspark提交环境

import json

import flask

from flask import Flask

from concurrent.futures import ThreadPoolExecutor

app = Flask(__name__)

pool = ThreadPoolExecutor(max_workers=8)

@app.route('/')

def hello_world(): # put application's code here

return 'Hello World!'

@app.route('/downloadCode', methods=['post'])

def download_file():

model_id = flask.request.json.get('modelId')

print(model_id)

"""

异步提交任务:pool.submit()

"""

return json.dumps(0, ensure_ascii=False)

@app.route('/modelRun', methods=['post'])

def model_run():

"""

异步提交任务:pool.submit()

"""

return json.dumps(0, ensure_ascii=False)

if __name__ == '__main__':

app.run()

spark@c67e6477b2f1:/opt/spark$ python3

Python 3.8.10 (default, May 26 2023, 14:05:08)

[GCC 9.4.0] on linux

Type "help", "copyright", "credits" or "license" for more information.

>>>

>>>

将python的调用整合到:start-master.sh 文件末尾启动调用,便可以通过k8s暴露spark-master的F5端口实现http调用。

3、使用spark-operator安装spark集群方式

可以参考阿里云文章:搭建Spark应用

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: