A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from http://yazgoo.github.io/fuse_kafka/html/zookeeper_8c_source.html below:

fuse_kafka: src/zookeeper.c Source File

00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 
00040 
00041 
00042 
00043 #include <output.h>
00044 #ifndef TEST
00045 #include <librdkafka/rdkafka.h>
00046 #include <zookeeper/zookeeper.h>
00047 #endif
00048 #ifndef FUSE_KAFKA_ZOOKEEPER_C
00049 #define FUSE_KAFKA_ZOOKEEPER_C
00050 #include <jansson.h>
00051 #define BROKER_PATH "/brokers/ids"
00052 #include "server_list.c"
00053 #include <trace.c>
00054 #include <string.h>
00055 static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers)
00056 {
00057     
00058     if (zzh)
00059     {
00060         struct String_vector brokerlist;
00061         if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK)
00062         {
00063             fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH);
00064             return;
00065         }
00066         int i;
00067         char *brokerptr = brokers;
00068         if(brokerptr == NULL) return;
00069         for (i = 0; i < brokerlist.count; i++)
00070         {
00071             char path[255], cfg[1024];
00072             sprintf(path, "/brokers/ids/%s", brokerlist.data[i]);
00073             int len = sizeof(cfg);
00074             zoo_get(zzh, path, 0, cfg, &len, NULL);
00075             if (len > 0)
00076             {
00077                 cfg[len] = '\0';
00078                 json_error_t jerror;
00079                 json_t *jobj = json_loads(cfg, 0, &jerror);
00080                 if (jobj)
00081                 {
00082                     json_t *jhost = json_object_get(jobj, "host");
00083                     json_t *jport = json_object_get(jobj, "port");
00084                     if (jhost && jport)
00085                     {
00086                         const char *host = json_string_value(jhost);
00087                         const int port = json_integer_value(jport);
00088                         
00089                         sprintf(brokerptr, "%s:%d", host, port);
00090                         brokerptr += strlen(brokerptr);
00091                         if (i < brokerlist.count - 1)
00092                         {
00093                             *brokerptr++ = ',';
00094                         }
00095                     }
00096                     json_decref(jobj);
00097                 }
00098             }
00099         }
00100         deallocate_String_vector(&brokerlist);
00101     }
00102 }
00103 void watcher_add_brokers(kafka_t* k, char* brokers, char* topic)
00104 {
00105     rd_kafka_topic_conf_t *topic_conf;
00106     if (brokers[0] != '\0' && k->rk != NULL &&
00107             server_list_add_once(&(k->broker_list), brokers))
00108     {
00109         rd_kafka_brokers_add(k->rk, brokers);
00110         k->no_brokers = 0;
00111         rd_kafka_poll(k->rk, 10);
00112         topic_conf = rd_kafka_topic_conf_new();
00113         k->rkt = rd_kafka_topic_new(k->rk, topic, topic_conf);
00114         if(k->rkt == NULL)
00115             printf("topic %s creation failed\n", topic);
00116     }
00117 }
00118 static void watcher(zhandle_t *zh, int type,
00119         int state, const char *path, void *param)
00120 {
00121     char brokers[1024];
00122     kafka_t* k = (kafka_t*) param;
00123     if(k->conf == NULL) return;
00124     char* topic;
00125     if(k->conf->topic_n <= 0)
00126     {
00127         trace("topic missing");
00128         return;
00129     }
00130     topic = k->conf->topic[0];
00131     
00132     if (k->no_brokers || type == ZOO_CHILD_EVENT && strncmp(
00133                 path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)
00134     {
00135         brokers[0] = '\0';
00136         
00137 
00138         set_brokerlist_from_zookeeper(zh, brokers);
00139         watcher_add_brokers(k, brokers, topic);
00140     }
00141 }
00142 static zhandle_t* initialize_zookeeper(const char * zookeeper, void* param)
00143 {
00144     zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
00145     ((kafka_t*) param)->no_brokers = 1;
00146     ((kafka_t*) param)->broker_list = NULL;
00147     zhandle_t *zh;
00148     
00149     zh = zookeeper_init(zookeeper, watcher, 10000, 0, param, 0);
00150     if (zh == NULL)
00151     {
00152         fprintf(stderr, "Zookeeper connection not established.");
00153         
00154     }
00155     return zh;
00156 }
00157 #endif

RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4