背景介绍

Kafka是一个分布式流处理平台,可以处理大规模数据流并支持实时数据流的处理。

本文介绍了如何在WSL下使用Docker搭建Kafka容器,并使用Python的kafka-python库和FastAPI框架实现了一个简单的API。同时,还将该服务整合到一个整体的docker-compose中。文章详细介绍了Docker网络、Kafka环境变量配置、Python连接Kafka的方法以及API的开发。

实验环境

WSL2 Ubuntu18.04 | Docker

⚙️容器配置与搭建

镜像选择

bitnami/kafka镜像

Bitnami是一个提供开发、部署和管理应用程序的软件公司。Bitnami提供了Kafka的Docker镜像,并有非常详细的文档。我们将使用这个镜像来搭建Kafka容器。

由于Kafka需要Zookeeper支持,我们可以通过docker-compose来快速组合多个容器。

按照官方的文档,我们可以通过如下的配置快速搭建一个Kafka+Zookeeper的中间件:

# docker-compose.yml

version: "3"

networks:

app-tier:

driver: bridge

services:

zookeeper:

restart: always

image: 'bitnami/zookeeper:latest'

networks:

- app-tier

ports:

- '2181:2181'

environment:

- ALLOW_ANONYMOUS_LOGIN=yes

kafka:

restart: always

image: 'bitnami/kafka:latest'

networks:

- app-tier

ports:

- '9092:9092'

- '9093:9093'

environment:

- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181

- ALLOW_PLAINTEXT_LISTENER=yes

- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT

- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093

- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093

- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT

depends_on:

- zookeeper

在network部分,我们一个定义了一个 Docker 网络,名称为 “app-tier”,驱动程序为 “bridge”。

在 Docker 中,网络是一种虚拟网络,使容器之间可以进行通信。 “bridge” 驱动程序是 Docker 网络的默认驱动程序,它在单个 Docker 主机内创建一个内部网络,允许容器使用其 IP 地址相互通信。通过定义名称为 “app-tier”,驱动程序为 “bridge” 的网络,连接到该网络的任何容器都将能够使用其在网络内的 IP 地址相互通信。这可以用于创建微服务架构或其他分布式系统,其中多个容器需要彼此通信。

在Kafka的部分,我们设置了 Kafka 的环境变量:包括 Zookeeper 地址、允许明文监听器、监听器安全协议映射、监听器和广告监听器等。

注意,当前的配置将允许明文监听Kafka,这在实际生产环境中是不被允许的,我们在此为了便于开发环境的测试,允许直接监听。

剩余的环境配置可以在官网详细查询

容器使用

使用如下命令启动容器

docker-compose up

使用如下命令进入容器

sudo docker ps

sudo docker exec -it xx bash

注:其中xx是ps命令得到的Kafka容器的id前两位

进入容器后我们可以通过如下命令开启生产者和消费者

// 确保你在/opt/bitnami/kafka目录下

// 创建topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

// 启动生产者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

// 启动消费者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

你可以打开两个终端,分别开启一个生产者和消费者。如果环境运行正常,就可以在消费者的端口同步查看到生产的输入。

基于此,我们快速地搭建了一个Kafka的开发环境,并且成功地启动了生产者和消费者。在实际的开发中,我们可以使用这个环境来进行Kafka相关的开发和测试工作。同时,在生产环境中,我们需要根据实际情况来进行更加严格和安全的配置,以确保Kafka的安全和可靠性。

API构建与测试

除了命令行,我们还可以使用第三方框架实现对Kafka的连接和操作。下面用Python连接kafka并搭建一个简单的API供调试。

安装Python依赖

kafka-python

fastapi

uvicorn

pydantic

使用懒汉式的单例模式构建一个用于连接Kafka的上下文对象

# kafkaContext.py

import random

from kafka import KafkaProducer, KafkaClient

import time

def _get_kafka_producer_connection(host, port) -> KafkaProducer:

while True:

try:

producer = KafkaProducer(bootstrap_servers=f'{host}:{port}')

break

except Exception as e:

print('producer failed to connect, retrying', e)

time.sleep(5)

print('producer connected', producer)

return producer

def _get_kafka_client_connection(host, port) -> KafkaClient:

while True:

try:

client = KafkaClient(bootstrap_servers=f'{host}:{port}')

break

except Exception as e:

print('client failed to connect, retrying', e)

time.sleep(5)

print('client connected', client)

return client

class KafkaContext:

# 构建一个单例模式的producer

def __init__(self, host='kafka', port=9092):

self.__host = host

self.__port = port

self.__client = None

self.__producer = None

def __connect_client(self):

self.__client = _get_kafka_client_connection(self.__host, self.__port)

def __connect_producer(self):

self.__producer = _get_kafka_producer_connection(self.__host, self.__port)

def is_client_connected(self) -> bool:

if self.__client is None:

return False

return self.__client.bootstrap_connected()

def is_producer_connected(self) -> bool:

return self.__producer is not None

def add_topic(self, topic: str):

if self.__client is None:

self.__connect_client()

self.__client.add_topic(topic)

def send_msg(self, topic: str, msg: str) -> str:

if self.__producer is None:

self.__connect_producer()

# convert msg to bytes

msg_bytes = bytes(msg, encoding='utf-8')

self.__producer.send(topic, msg_bytes)

return msg

kafkaContext = KafkaContext()

注意到我们使用了kafka 的解析去连接Kafka,是因为我们后续要将该服务放在一个Docker容器内,并且接入到和上文提到的Kafka模块的网络中

使用FastAPI构建一个API,提供基本的状态检测、增加topic和生产消息的接口

# api.py

from datetime import datetime

from pydantic import BaseModel

from KafkaContext import kafkaContext

from fastapi import FastAPI

app = FastAPI(title="kafka-server", description="kafka-server", version="0.1.0")

class Message(BaseModel): # 继承了BaseModel,定义了People的数据格式

topic: str

msg: str

@app.get("/")

def read_root():

return {"time": datetime.now(), "status": "ok"}

@app.get("/health/client")

def health_client():

return {

"data": kafkaContext.is_client_connected()

}

@app.get("/health/producer")

def health_producer():

return {

"data": kafkaContext.is_producer_connected()

}

@app.get("/producer/add_topic/{topic}")

def add_topic(topic: str):

kafkaContext.add_topic(topic)

return {

"status": "ok",

"data": topic

}

@app.post("/producer/send_msg")

def send_msg(message: Message):

return {

"status": "ok",

"data": kafkaContext.send_msg(message.topic, message.msg)

}

如果希望开发更多的接口,可以阅读官方的文档 kafka-python · PyPI

在命令行输入命令启动服务

uvicorn api:app --host 0.0.0.0 --port 8000 --reload

打开浏览器访问http://:8000/docs即可看到接口文档,如果Kafka的消费者还在运行,则可以尝试接口是否运行正常。

至此,我们已经实现了用Docker搭建一个Kafka模块并使用FastAPI结合python-kafka实现了接口的开发。

容器整合

最后,我们将该服务整合到一个整体的docker-compose中

首先我们先构建后端API的镜像,该镜像使用了Python的环境。

FROM python

LABEL author="chene2000"

ENV PYTHONIOENCODING=utf-8

RUN mkdir -p /app

WORKDIR /app

COPY requirements.txt /app

RUN pip3 install -r requirements.txt -i https://pypi.doubanio.com/simple --trusted-host pypi.doubanio.com

COPY . /app

CMD bash start-server.sh

在docker-compose.yml中追加一个server的服务

server:

restart: always

# image: 'kafka-server'

container_name: 'kafka-server'

build:

dockerfile: Dockerfile

context: ./server/

networks:

- app-tier

ports:

- '8002:8000'

depends_on:

- zookeeper

- kafka

注意,我们采用了如下的项目结构

├── docker-compose.yml

└── server

├── api.py

├── Dockerfile

├── KafkaContext.py

├── requirements.txt

└── start-server.sh

在docker-compose.yml的build处我们配置了Docker容器构建时的目录位置和构建所用的Dockerfile配置。

回到根目录,运行如下命令

# 构建容器

sudo docker compose build

# 启动容器

sudo docker compose up

# 启动容器(后台运行)

sudo docker compose up -d

构建完毕

️ 小结

在实际生产环境中,应注意Kafka的安全性和可靠性。在使用第三方框架连接Kafka时,需要使用Kafka的解析进行连接。最后,将Kafka模块整合到docker-compose中,方便进行部署和使用。

项目代码 https://github.com/ChenE2000/thesis-kafka-server

好文链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: