import threading
import time
import traceback

from base.utils import kafka_helper_copy
from base.utils.kafka_helper import producer_myself
from base.utils.log import logger


def cost_hbase(n):
    totalcount = 0
    while True:
        try:
            item_hbase = kafka_helper_copy.consumer_myself_k(topic="aic.spider.hbase", group_id="hbase_es")
            totalcount += 1
            logger.info(f"cost_hbase 目前消费数量 {totalcount} ")
            producer_myself("aic.spider", item_hbase, max_num=200)
        except:
            traceback.print_exc()
            time.sleep(1)


if __name__ == '__main__':
    i = 0
    threading.Thread(target=cost_hbase, args=(i,)).start()
