#ifndef CSCI_KAFKA_HANDLER_H #define CSCI_KAFKA_HANDLER_H #include #include #include #include "../config/kafka_config.h" #include "../include/kafka/rdkafkacpp.h" class KafkaHandler: public QObject{ Q_OBJECT private: static bool _run; static RdKafka::Producer* _producer; private: static void _initConsumer(const std::string& servers, const QStringList& topics); static void _consumerStart(RdKafka::KafkaConsumer * consumer); static void _receive(RdKafka::Message *message); static void _initProducer(const std::string& servers); public: static void init(); static void stop(); static KafkaHandler* getIns(); static bool message(const QString& topic, const QString& message, const QString& key = nullptr, RdKafka::Headers *headers = RdKafka::Headers::create(), int partition = RdKafka::Topic::PARTITION_UA, void * que = nullptr); public: KafkaHandler(); public: ~KafkaHandler(); public: signals: void handle(const QString & key, const QString & message); }; #endif //CSCI_KAFKA_HANDLER_H