您的位置:首页 > 游戏 > 游戏 > 哈尔滨建设网站官网_网页制作素材在哪里找_sem是什么品牌_网页制作软件推荐

哈尔滨建设网站官网_网页制作素材在哪里找_sem是什么品牌_网页制作软件推荐

2025/5/7 5:18:46 来源:https://blog.csdn.net/weixin_39810558/article/details/146986582  浏览:    关键词:哈尔滨建设网站官网_网页制作素材在哪里找_sem是什么品牌_网页制作软件推荐
哈尔滨建设网站官网_网页制作素材在哪里找_sem是什么品牌_网页制作软件推荐

新建项目

mkdir python-kafka-test
cd python-kafka-test

安装依赖

pip install confluent_kafka

创建配置文件

# Kafka配置文件# Kafka服务器配置
KAFKA_CONFIG = {'bootstrap.servers': 'localhost:9092',# 生产者特定配置'producer': {'client.id': 'python-kafka-producer','acks': 'all',                 # 确保消息被所有副本确认'retries': 3,                  # 重试次数'retry.backoff.ms': 1000,      # 重试间隔'batch.size': 16384,           # 批处理大小'linger.ms': 5,                # 等待时间以允许更多消息加入批次'compression.type': 'snappy',  # 压缩类型},# 消费者特定配置'consumer': {'group.id': 'notification-group','auto.offset.reset': 'earliest','enable.auto.commit': True,'auto.commit.interval.ms': 5000,'session.timeout.ms': 30000,'max.poll.interval.ms': 300000,'heartbeat.interval.ms': 10000,}
}# 主题配置
TOPICS = {'email': 'email-topic','sms': 'sms-topic'
}

创建Kafka生产者

import json
import logging
import signal
import sys
from confluent_kafka import Producer
from config import KAFKA_CONFIG, TOPICS# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[logging.StreamHandler()]
)
logger = logging.getLogger('kafka-producer')# 合并配置
producer_config = {**KAFKA_CONFIG, **KAFKA_CONFIG.get('producer', {})}
# 移除嵌套的producer配置,避免冲突
if 'producer' in producer_config:del producer_config['producer']
if 'consumer' in producer_config:del producer_config['consumer']# 创建Producer实例
p = Producer(producer_config)# 标记是否正在关闭
shutting_down = Falsedef signal_handler(sig, frame):"""处理终止信号,确保优雅关闭"""global shutting_downif shutting_down:returnshutting_down = Truelogger.info("接收到终止信号,正在优雅关闭...")# 确保所有消息都被发送p.flush(10)  # 等待最多10秒logger.info("生产者已关闭")sys.exit(0)# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)def delivery_report(err, msg):"""消息发送回调函数"""if err is not None:logger.error(f'消息发送失败: {err}')else:logger.info(f'消息已发送到 {msg.topic()} [分区 {msg.partition()}]')def send_notification(topic_key, payload, key=None):"""发送通知消息到指定主题Args:topic_key: 主题键名(在TOPICS字典中定义)payload: 消息内容(字典或JSON字符串)key: 可选的消息键Returns:bool: 是否成功将消息加入发送队列"""try:# 获取实际主题名topic = TOPICS.get(topic_key, topic_key)# 如果payload是字典,转换为JSON字符串if isinstance(payload, dict):payload = json.dumps(payload)# 发送消息p.produce(topic, payload.encode('utf-8'), key=key.encode('utf-8') if key else None,callback=delivery_report)# 轮询一次以触发回调p.poll(0)logger.info(f'消息已加入发送队列: {topic}')return Trueexcept Exception as e:logger.error(f'发送消息时出错: {e}')return False# 使用示例
if __name__ == "__main__":try:# 发送邮件通知email_payload = {"to": "receiver@example.com", "from": "sender@example.com", "subject": "Sample Email", "body": "This is a sample email notification"}send_notification('email', email_payload)# 发送短信通知sms_payload = {"phoneNumber": "1234567890", "message": "This is a sample SMS notification"}send_notification('sms', sms_payload)# 确保所有消息都被发送remaining = p.flush(timeout=5)if remaining > 0:logger.warning(f'仍有 {remaining} 条消息未发送完成')else:logger.info('所有消息已成功发送')except KeyboardInterrupt:logger.info("程序被用户中断")except Exception as e:logger.error(f"发生错误: {e}")finally:# 确保所有消息都被发送p.flush(timeout=5)

创建Kafka消费者

import json
import logging
import signal
import sys
from confluent_kafka import Consumer, KafkaError
from config import KAFKA_CONFIG, TOPICS# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[logging.StreamHandler()]
)
logger = logging.getLogger('kafka-consumer')# 合并配置
consumer_config = {**KAFKA_CONFIG, **KAFKA_CONFIG.get('consumer', {})}
# 移除嵌套的配置,避免冲突
if 'producer' in consumer_config:del consumer_config['producer']
if 'consumer' in consumer_config:del consumer_config['consumer']# 创建Consumer实例
c = Consumer(consumer_config)# 标记是否正在关闭
shutting_down = Falsedef signal_handler(sig, frame):"""处理终止信号,确保优雅关闭"""global shutting_downif shutting_down:returnshutting_down = Truelogger.info("接收到终止信号,正在优雅关闭...")sys.exit(0)# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)def process_message(msg):"""处理接收到的消息Args:msg: Kafka消息对象"""try:topic = msg.topic()value = msg.value().decode("utf-8")key = msg.key().decode("utf-8") if msg.key() else None# 尝试解析JSONtry:payload = json.loads(value)logger.info(f'接收到消息 [主题: {topic}, 键: {key}]')logger.debug(f'消息内容: {payload}')except json.JSONDecodeError:logger.info(f'接收到非JSON消息 [主题: {topic}, 键: {key}]: {value}')# 根据主题类型处理不同的消息if topic == TOPICS['email']:handle_email_notification(payload if 'payload' in locals() else value)elif topic == TOPICS['sms']:handle_sms_notification(payload if 'payload' in locals() else value)else:logger.warning(f'收到未知主题的消息: {topic}')except Exception as e:logger.error(f'处理消息时出错: {e}')def handle_email_notification(payload):"""处理邮件通知"""# 这里实现实际的邮件发送逻辑logger.info(f'处理邮件通知: {payload}')def handle_sms_notification(payload):"""处理短信通知"""# 这里实现实际的短信发送逻辑logger.info(f'处理短信通知: {payload}')def main():try:# 订阅主题topics_to_subscribe = list(TOPICS.values())logger.info(f'订阅主题: {topics_to_subscribe}')c.subscribe(topics_to_subscribe)logger.info('开始消费消息...')while not shutting_down:msg = c.poll(1.0)  # 超时时间1秒if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:# 到达分区末尾,不是错误logger.debug(f'到达分区末尾: {msg.topic()} [{msg.partition()}]')continueelse:# 其他错误logger.error(f'Kafka错误: {msg.error()}')break# 处理消息process_message(msg)except KeyboardInterrupt:logger.info("程序被用户中断")except Exception as e:logger.error(f"发生错误: {e}")finally:# 关闭消费者logger.info("关闭消费者...")c.close()logger.info("消费者已关闭")if __name__ == "__main__":main()

运行项目

打开终端运行命令

python producer.py
python consumer.py

可以看到终端输出正常

详细代码:https://github.com/wan88888/python-kafka-test

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com