使用AWS中国区Lambda集成Glue Schema Registry消费Kafka消息的实践 本文在 AWS 中国区cn-north-1实现 Docker 自建 Kafka 与 AWS Lambda Glue Schema Registry 的完整集成。Kafka 运行在 EC2 实例上Lambda 通过 VPC 内网消费消息使用 Avro 格式进行数据序列化。整体的数据流图如下CloudWatch LogsGlue Schema RegistryLambda FunctionDocker KafkaProducer (Avro)CloudWatch LogsGlue Schema RegistryLambda FunctionDocker KafkaProducer (Avro)注册 Schema发送 Avro 消息ESM 轮询推送获取 Schema (by ID)返回 Schema 定义Avro 反序列化记录处理日志核心概念SelfManagedKafka 事件源AWS Lambda 支持多种事件源其中 SelfManagedKafka 类型允许 Lambda 直接连接自建 Kafka 集群无需经过 MSK。KafkaBootstrapServers: Kafka 代理地址数组格式Topics: 订阅的 Topic 列表StartingPosition: 消费起始位置 (LATEST/TRIM_HORIZON)SourceAccessConfigurations: VPC 访问配置注意KafkaBootstrapServers必须是数组类型KafkaBootstrapServers:-!Sub${EC2PrivateIp}:9092EventSourceMapping 事件格式Lambda 接收自 Kafka 的事件结构与直接调用不同records是字典key 为{topic}-{partition}value是 Base64 编码的消息内容Lambda 需要遍历records字典的值{eventSource:SelfManagedKafka,bootstrapServers:172.31.14.46:9092,records:{orders-0:[{topic:orders,partition:0,offset:1,timestamp:1779613023206,timestampType:CREATE_TIME,value:eyJvcmRlcl9pZCI6...,headers:[]}]}}Glue Schema Registry 集成Glue Schema Registry 提供 Schema 定义和版本管理。但是根据 AWS 官方文档中国区的lambda服务目前不可用Provisioned mode for event source mappings is not available in the China Regions.Provisioned Mode是 Lambda ESM (Event Source Mapping) 的一种事件轮询模式用于控制 Lambda 如何从 Kafka/MSK/SQS 拉取消息。由于SchemaRegistryConfig必须配合ProvisionedPollerConfig即 Provisioned Mode使用因此中国区 Lambda ESM无法使用 Schema Registry 自动验证。解决方案是在 Lambda 代码中手动处理 Avro 反序列化。根据 AWS 官方文档 Using schema registries with Kafka event sourcesThis feature is only available for event source mappings using provisioned mode. Schema registry doesn’t support event source mappings in on-demand mode.如果尝试在 On-Demand 模式下配置 SchemaRegistryConfig会收到以下错误SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.Schema Registry 集成需要在 ESM poller 层面执行额外工作查询 schema、解码消息AWS 将此功能绑定到 Provisioned Mode 实现。Lambda FunctionLambda ESM PollerKafka ClusterProvisioned Mode RequiredKafka Message(Avro bytes)Schema Registry Lookup自动查询 Glue SchemaAvro Decode自动反序列化Handler收到 JSON 格式事件由于 Provisioned Mode 不可用需在 Lambda 代码中手动处理 Avro 序列化fromaws_schema_registryimportSchemaRegistryClient,KafkaDeserializerimportboto3 glue_clientboto3.client(glue,region_namecn-north-1)registry_arnarn:aws-cn:glue:cn-north-1:xxxxxxxxxx:registry/orders-registryschema_clientSchemaRegistryClient(glue_client,registry_arn)deserializerKafkaDeserializer(schema_client)deflambda_handler(event,context):fortopic_partition,recordsinevent.get(records,{}).items():forrecordinrecords:value_bytesbase64.b64decode(record[value])decodeddeserializer.deserialize(topic,value_bytes)# 处理 decoded.data (Python dict)...部署与配置SAM部署基础设施SAM 模板如下AWSTemplateFormatVersion:2010-09-09Transform:AWS::Serverless-2016-10-31Resources:# Lambda Layer - 包含 aws-glue-schema-registryGlueSchemaRegistryLayer:Type:AWS::Serverless::LayerVersionProperties:LayerName:glue-schema-registry-layerContentUri:glue-schema-registry-layer.zipCompatibleRuntimes:-python3.12# Glue RegistryGlueRegistry:Type:AWS::Glue::RegistryProperties:Name:orders-registry# Lambda FunctionConsumerFunction:Type:AWS::Serverless::FunctionProperties:FunctionName:kafka-order-consumerRuntime:python3.12Handler:consumer.lambda_handlerCodeUri:.Layers:-!RefGlueSchemaRegistryLayerVpcConfig:SubnetIds:-!RefPrivateSubnet1SecurityGroupIds:-!RefLambdaSecurityGroupEnvironment:Variables:GLUE_REGISTRY_ARN:!RefGlueRegistryEvents:KafkaEvent:Type:SelfManagedKafkaProperties:KafkaBootstrapServers:-!Sub${EC2PrivateIp}:9092Topics:-ordersStartingPosition:LATESTSourceAccessConfigurations:-Type:VPC_SUBNETURI:!RefPrivateSubnet1-Type:VPC_SECURITY_GROUPURI:!RefLambdaSecurityGroupPolicies:-Statement:-Sid:GlueAccessEffect:AllowAction:-glue:GetRegistry-glue:GetSchemaVersion-glue:GetSchemaByDefinition-glue:GetSchemaResource:*由于中国区不支持 ESM 级别的 Schema Registry 自动验证Lambda 需要手动集成 Glue Schema Registry 进行消息反序列化。Lambda 需要包含aws-glue-schema-registry库。创建 Layer# 在本地创建 Layermkdir-player/python pipinstall-tlayer/python aws-glue-schema-registry boto3cdlayerzip-r../glue-schema-registry-layer.zip.在 SAM 模板中引用Layers:-!RefGlueSchemaRegistryLayer部署命令# 构建sam build# 部署sam deploy --resolve-s3 --no-confirm-changeset部署资源如下Lambda代码示例Handler 代码如下importjsonimportbase64importosimportloggingimportboto3fromaws_schema_registryimportSchemaRegistryClient,KafkaDeserializer# 初始化在 handler 外部避免每次调用重新初始化loggerlogging.getLogger()logger.setLevel(os.getenv(LOG_LEVEL,INFO))glue_clientboto3.client(glue,region_namecn-north-1)registry_nameorders-registry# Schema Registry 客户端延迟初始化schema_clientNonedeserializerNonedefget_deserializer():延迟初始化 deserializerglobalschema_client,deserializerifdeserializerisNone:schema_clientSchemaRegistryClient(glue_client,registry_name)deserializerKafkaDeserializer(schema_client)returndeserializerdeflambda_handler(event,context): 处理 Kafka 事件使用 Glue Schema Registry 反序列化 Avro 消息. 支持两种消息格式 1. Avro 格式带 schema ID 前缀- 使用 Glue Schema Registry 反序列化 2. JSON 格式 - 直接解析 logger.info(fEvent source:{event.get(eventSource)})results[]batch_item_failures[]records_by_topicevent.get(records,{})fortopic_partition,recordsinrecords_by_topic.items():logger.info(fProcessing{topic_partition}:{len(records)}records)forrecordinrecords:try:topicrecord.get(topic,unknown)partitionrecord.get(partition,-1)offsetrecord.get(offset,-1)value_b64record.get(value,)ifnotvalue_b64:value{}else:value_bytesbase64.b64decode(value_b64)# 尝试 Avro 反序列化try:deserget_deserializer()decodeddeser.deserialize(topic,value_bytes)valuedecoded.data logger.info(f[{topic}] p{partition}o{offset}(Avro) data{value})exceptExceptionasavro_err:# 回退到 JSON 解析try:valuejson.loads(value_bytes.decode(utf-8))logger.info(f[{topic}] p{partition}o{offset}(JSON) data{value})exceptExceptionasjson_err:logger.error(fFailed to deserialize: avro{avro_err}, json{json_err})raiseavro_err# 处理业务逻辑process_order(value)results.append({recordId:record.get(recordId,),result:Ok,data:value_b64})exceptExceptionase:logger.error(fFailed to process record:{e})batch_item_failures.append({itemIdentifier:str(record.get(offset))})ifbatch_item_failures:return{batchItemFailures:batch_item_failures}return{records:results}defprocess_order(order:dict):业务处理逻辑order_idorder.get(order_id)logger.info(fProcessing order:{order_id})Producer 使用aws-glue-schema-registry库序列化 Avro 消息#!/usr/bin/env python3importuuidfromdatetimeimportdatetime,timezoneimportboto3fromaws_schema_registryimportSchemaRegistryClient,KafkaSerializer,DataAndSchemafromaws_schema_registry.avroimportAvroSchemafromconfluent_kafkaimportProducer REGISTRY_NAMEorders-registryBOOTSTRAP_SERVERS172.31.1.2:9092TOPICordersAVRO_SCHEMA { type: record, name: Order, namespace: com.example.orders, fields: [ {name: order_id, type: string}, {name: customer_id, type: string}, {name: amount, type: double}, {name: status, type: string}, {name: created_at, type: string} ] } defmain():glueboto3.client(glue,region_namecn-north-1)schema_clientSchemaRegistryClient(glue,registry_nameREGISTRY_NAME)serializerKafkaSerializer(schema_client)producerProducer({bootstrap.servers:BOOTSTRAP_SERVERS})foriinrange(3):order{order_id:favro-{uuid.uuid4().hex[:8]},customer_id:fcust-{(i%5)1:03d},amount:round(100.0i*10.5,2),status:pending,created_at:datetime.now(timezone.utc).isoformat(),}print(fSending Avro message{i1}:{order[order_id]})schemaAvroSchema(AVRO_SCHEMA.strip())serializedserializer.serialize(TOPIC,DataAndSchema(dataorder,schemaschema),)producer.produce(topicTOPIC,valueserialized,callbacklambdaerr,msg:print(fDelivered to{msg.topic()}[{msg.partition()}]ifnoterrelsefFailed:{err}),)producer.poll(0)producer.flush()print(f\nSent 3 Avro messages to{TOPIC})if__name____main__:main()注意aws-glue-schema-registry会自动在 Glue 中创建{topic}-value命名的 schema如orders-value而非使用 SAM 创建的order-schema。kafka部署使用 KRaft 模式services:kafka:image:confluentinc/cp-kafka:7.5.0container_name:kafkaports:-9092:9092environment:KAFKA_PROCESS_ROLES:broker,controllerKAFKA_NODE_ID:1KAFKA_CONTROLLER_QUORUM_VOTERS:1kafka:9093KAFKA_CONTROLLER_LISTENER_NAMES:CONTROLLERKAFKA_LISTENERS:INTERNAL://:9092,CONTROLLER://:9093KAFKA_ADVERTISED_LISTENERS:INTERNAL://${PRIVATE_IP}:9092KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXTKAFKA_INTER_BROKER_LISTENER_NAME:INTERNALKAFKA_AUTO_CREATE_TOPICS_ENABLE:falseKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:1CLUSTER_ID:MkU3OEVBNTcwNTJENDM2Qkvolumes:-kafka-data:/var/lib/kafka/datavolumes:kafka-data:生产和消费发送测试消息# JSON 格式用于基础测试sudodockerexeckafkabash-cecho {\order_id\:\test-001\,\customer_id\:\cust-001\,\amount\:99.99,\status\:\test\} | kafka-console-producer --bootstrap-server localhost:9092 --topic orders# Avro 格式~/.local/bin/uv run python producer_avro.py查看 Lambda 日志aws--regioncn-north-1 logstail/aws/lambda/kafka-order-consumer--since2m--formatshort成功日志如下Event source: SelfManagedKafka Processing orders-1: 2 records Fetching schema version 498aaebe-e863-48c3-b330-fcc3940ea57d... [orders] p1 o6 (Avro) data{order_id: avro-9591de65, customer_id: cust-002, amount: 110.5, status: pending, created_at: 2026-05-24T10:54:40.79948900:00} Processing order: avro-9591de65日志截图