#include "kafka_handler.h" #include bool KafkaHandler::_run = false; RdKafka::Producer* KafkaHandler::_producer = nullptr; void KafkaHandler::init() { auto conf = KafkaConfig::getIns(); auto consumerServers = conf->getConsumerServers().join(",").toStdString(); auto consumerTopics = conf->getConsumerTopics(); auto producer = conf->getProducerServers().join(",").toStdString(); if(consumerTopics.empty()){ qFatal("kafka consumer topic can`t empty !"); } // 初始化 consumer _run = true; _initConsumer(consumerServers, consumerTopics); // 实例化 producer _initProducer(producer); } void KafkaHandler::_initConsumer(const std::string& servers, const QStringList& topics) { auto conf=RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); std::string err; if(conf->set("bootstrap.servers", servers, err) != RdKafka::Conf::CONF_OK){ qFatal("kafka consumer set bootstrap servers error: %s", &err); } if(conf->set("group.id",topics.join(",").toStdString(),err) != RdKafka::Conf::CONF_OK){ qFatal("kafka consumer set group id error: %s", &err); } //创建消费者 auto consumer= RdKafka::KafkaConsumer::create(conf,err); if (!consumer) { qFatal("create consumer error: %s", &err); } QVector vector; for (const auto &item: topics.toVector()){ vector.push_back(item.toStdString()); } consumer->subscribe(vector.toStdVector()); QtConcurrent::run(_consumerStart, consumer); } void KafkaHandler::_consumerStart(RdKafka::KafkaConsumer * consumer) { while (_run){ RdKafka::Message *msg = consumer->consume(100); _receive(msg); } consumer->close(); delete consumer; } void KafkaHandler::_receive(RdKafka::Message *message) { switch (message->err()) { case RdKafka::ERR__TIMED_OUT: break; case RdKafka::ERR_NO_ERROR: { // TODO key 校验 message->key() QString key; if(message->key()){ key = QString::fromStdString(*message->key()); // qDebug() << "receive message key:" << key; } // TODO header 校验 message->headers() auto headers = message->headers(); if (headers) { std::vector hdrs = headers->get_all(); for (size_t i = 0; i < hdrs.size(); i++) { const RdKafka::Headers::Header hdr = hdrs[i]; if (hdr.value() != nullptr){ std::string value (static_cast(hdr.value()), hdr.value_size()); // qDebug() << "receive message headers: " << QString::fromStdString(hdr.key()) // << "-" << QString::fromStdString(value); } } } std::string payload (static_cast(message->payload()), message->len()); auto msg = QString::fromStdString(payload); // qDebug() << "receive message payload:" << msg; emit getIns()->handle(key, msg); } break; case RdKafka::ERR__PARTITION_EOF: break; case RdKafka::ERR__UNKNOWN_TOPIC: case RdKafka::ERR__UNKNOWN_PARTITION: qCritical("consume failed: %s", qPrintable(QString::fromStdString(message->errstr()))); _run = false; break; default: qWarning("consume failed: %s" , qPrintable(QString::fromStdString(message->errstr()))); _run = false; break; } } void KafkaHandler::stop() { _run = false; } void KafkaHandler::_initProducer(const std::string &servers) { auto conf= RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); std::string err; if(conf->set("bootstrap.servers", servers, err) != RdKafka::Conf::CONF_OK){ qFatal("kafka consumer set bootstrap servers error: %s", &err); } //创建生产者 _producer = RdKafka::Producer::create(conf,err); if(!_producer) { qFatal("create producer error: %s", &err); } } bool KafkaHandler::message(const QString& topic, const QString& message,const QString& key, RdKafka::Headers *headers, int partition, void * que) { auto payload = message.toStdString(); std::string k; if(nullptr != key){ k = key.toStdString(); } auto code = _producer->produce(topic.toStdString(), partition, RdKafka::Producer::RK_MSG_COPY, const_cast(payload.c_str()), payload.size(), const_cast(k.c_str()), k.size(), 0, headers, que ); return RdKafka::ERR_NO_ERROR == code; } KafkaHandler *KafkaHandler::getIns() { static KafkaHandler kafkaHandler; return &kafkaHandler; } KafkaHandler::KafkaHandler() = default; KafkaHandler::~KafkaHandler() = default;