00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 00025 00026 00027 00028 00029 00030 00031 00032 00033 00034 00035 00036 00037 00038 00039 00040 00041 00042 00043 #include <output.h> 00044 #ifndef TEST 00045 #include <librdkafka/rdkafka.h> 00046 #include <zookeeper/zookeeper.h> 00047 #endif 00048 #ifndef FUSE_KAFKA_ZOOKEEPER_C 00049 #define FUSE_KAFKA_ZOOKEEPER_C 00050 #include <jansson.h> 00051 #define BROKER_PATH "/brokers/ids" 00052 #include "server_list.c" 00053 #include <trace.c> 00054 #include <string.h> 00055 static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) 00056 { 00057 00058 if (zzh) 00059 { 00060 struct String_vector brokerlist; 00061 if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) 00062 { 00063 fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH); 00064 return; 00065 } 00066 int i; 00067 char *brokerptr = brokers; 00068 if(brokerptr == NULL) return; 00069 for (i = 0; i < brokerlist.count; i++) 00070 { 00071 char path[255], cfg[1024]; 00072 sprintf(path, "/brokers/ids/%s", brokerlist.data[i]); 00073 int len = sizeof(cfg); 00074 zoo_get(zzh, path, 0, cfg, &len, NULL); 00075 if (len > 0) 00076 { 00077 cfg[len] = '\0'; 00078 json_error_t jerror; 00079 json_t *jobj = json_loads(cfg, 0, &jerror); 00080 if (jobj) 00081 { 00082 json_t *jhost = json_object_get(jobj, "host"); 00083 json_t *jport = json_object_get(jobj, "port"); 00084 if (jhost && jport) 00085 { 00086 const char *host = json_string_value(jhost); 00087 const int port = json_integer_value(jport); 00088 00089 sprintf(brokerptr, "%s:%d", host, port); 00090 brokerptr += strlen(brokerptr); 00091 if (i < brokerlist.count - 1) 00092 { 00093 *brokerptr++ = ','; 00094 } 00095 } 00096 json_decref(jobj); 00097 } 00098 } 00099 } 00100 deallocate_String_vector(&brokerlist); 00101 } 00102 } 00103 void watcher_add_brokers(kafka_t* k, char* brokers, char* topic) 00104 { 00105 rd_kafka_topic_conf_t *topic_conf; 00106 if (brokers[0] != '\0' && k->rk != NULL && 00107 server_list_add_once(&(k->broker_list), brokers)) 00108 { 00109 rd_kafka_brokers_add(k->rk, brokers); 00110 k->no_brokers = 0; 00111 rd_kafka_poll(k->rk, 10); 00112 topic_conf = rd_kafka_topic_conf_new(); 00113 k->rkt = rd_kafka_topic_new(k->rk, topic, topic_conf); 00114 if(k->rkt == NULL) 00115 printf("topic %s creation failed\n", topic); 00116 } 00117 } 00118 static void watcher(zhandle_t *zh, int type, 00119 int state, const char *path, void *param) 00120 { 00121 char brokers[1024]; 00122 kafka_t* k = (kafka_t*) param; 00123 if(k->conf == NULL) return; 00124 char* topic; 00125 if(k->conf->topic_n <= 0) 00126 { 00127 trace("topic missing"); 00128 return; 00129 } 00130 topic = k->conf->topic[0]; 00131 00132 if (k->no_brokers || type == ZOO_CHILD_EVENT && strncmp( 00133 path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0) 00134 { 00135 brokers[0] = '\0'; 00136 00137 00138 set_brokerlist_from_zookeeper(zh, brokers); 00139 watcher_add_brokers(k, brokers, topic); 00140 } 00141 } 00142 static zhandle_t* initialize_zookeeper(const char * zookeeper, void* param) 00143 { 00144 zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); 00145 ((kafka_t*) param)->no_brokers = 1; 00146 ((kafka_t*) param)->broker_list = NULL; 00147 zhandle_t *zh; 00148 00149 zh = zookeeper_init(zookeeper, watcher, 10000, 0, param, 0); 00150 if (zh == NULL) 00151 { 00152 fprintf(stderr, "Zookeeper connection not established."); 00153 00154 } 00155 return zh; 00156 } 00157 #endif
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4