背景

在数据量较大的业务场景中,spark在数据处理、传统机器学习训练、 深度学习相关业务,能取得较明显的效率提升。 本篇围绕spark大数据背景下的推理,介绍一些优雅的使用方式。

spark适用场景

大数据量自定义方法处理、类sql处理传统机器学习方法(k-means、xgboost、lr…)分布式深度学习推理

目前在10亿+数据量的推理场景中使用,需要用户自己实现批数据准备,基于RDD的方法完成模型推理输出。 业务使用中的问题:

模型文件重复导入加载自定义批数据准备,脱离深度学习dataloader框架,操作略显麻烦,有性能和内存oom等问题。

实践

spark加速深度学习推理

spark加速深度学习推理,基本思路为:

开启不定量worker并行执行(cpu或gpu)推理任务所有worker共享同一份模型参数依赖spark pandas udf功能,方便并行处理 dataframe 数据依赖深度学习框架,方便实现最优批数据划分 下面以pytorch resnet 为实践demo

加载&&广播模型参数

广播模型参数,不仅能减少模型重复加载带来的流量和io,而且能加速推理前模型加载的速度。 driver广播模型参数:

# Load ResNet50 on driver node and broadcast its state.

model_state = models.resnet50(pretrained=True).state_dict()

bc_model_state = sc.broadcast(model_state)

worker读取模型参数:

def get_model_for_eval():

"""Gets the broadcasted model."""

model = models.resnet50(pretrained=True)

model.load_state_dict(bc_model_state.value)

model.eval()

return model

实现基于dataframe的dataset

目前主流的深度学习框架,dataset的实现大多基于本地存储,在读取分布式存储的场景 需要用户自定义实现。 自定义实现有2个方法:

使用分布式存储的api接口读取文件内容dataset读取dataframe二进制文件内容

方法一迭代与使用的存储类型会保持同步,且每次使用前需要明确使用的分布式存储,虽然实现方法容易但是使用流程略显麻烦。 方法二不需要关心分布式存储类型,只要需要获取并解析spark dataframe列传入内容即可。

本文采用方法二实现dataset:

# 从二进制流中解析图片信息

def pil_loader(binary_file):

# open path as file to avoid ResourceWarning (https://github.com/python-pillow/Pillow/issues/835)

image_io = io.BytesIO(binary_file)

img = Image.open(image_io)

return img.convert('RGB')

# Create a custom PyTorch dataset class.

class ImageDataset(Dataset):

def __init__(self, data, transform=None):

self.data = data

self.transform = transform

def __len__(self):

return len(self.data)

def __getitem__(self, index):

image = pil_loader(self.data[index])

if self.transform is not None:

image = self.transform(image)

return image

实现批量推理的pandas udf

Pandas udf是基于RDD的一个低门槛高性能的实现方法,pandas udf能自定义处理逻辑,以列的方式操作datafrme内容。 这是社区目前推荐的自定义处理方式。

# Define the function for model inference.

# PyArrow >= 1.0.0 must be installed;

@pandas_udf(ArrayType(FloatType()))

def predict_batch_udf(binaray_data: pd.Series) -> pd.Series:

transform = transforms.Compose([

transforms.Resize(224),

transforms.CenterCrop(224),

transforms.ToTensor(),

transforms.Normalize(mean=[0.485, 0.456, 0.406],

std=[0.229, 0.224, 0.225])

])

images = ImageDataset(binaray_data, transform=transform)

loader = torch.utils.data.DataLoader(images, batch_size=500, num_workers=8)

model = get_model_for_eval()

model.to(device)

all_predictions = []

with torch.no_grad():

for batch in loader:

predictions = list(model(batch.to(device)).cpu().numpy())

for prediction in predictions:

all_predictions.append(prediction)

return pd.Series(all_predictions)

# 调用pandas udf

predictions_df = df. \

select(col("filename"), predict_batch_udf(col("data")).alias("prediction"))

更多代码细节: https://github.com/Crazybean-lwb/deeplearning-pyspark/blob/master/examples/pytorch-inference.py

模型仓加速推理

打通到模型仓mlflow功能:

模型存储和版本管理便捷取用适用spark datarame更高阶的pandas udf实现

# Create the PySpark UDF

import mlflow.pyfunc

pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

# 调用pandas udf

df = spark_df.withColumn("prediction", pyfunc_udf(struct([...])))

参考信息:

pytorch分布式批量推理tensorflow分布式批量推理模型仓mlflow协助分布式批量推理

参考阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: