ERROR backtype.storm.util - Async loop died java.lang.RuntimeException: java.lang.RuntimeException -


i running simple word count topology using kafka storm integration.i getting output when using spout source text file.but when using kafka spout getting following error.

the error :

error backtype.storm.util - async loop died! java.lang.runtimeexception: java.lang.runtimeexception: org.apache.zookeeper.keeperexception$nonodeexception: keepererrorcode = nonode /brokers/topics/randomquery/partitions

7173 [thread-4-sendthread(localhost:2006)] info  org.apache.storm.zookeeper.clientcnxn - opening socket connection server localhost/127.0.0.1:2006. not attempt authenticate using sasl (unknown error) 7174 [thread-4-sendthread(localhost:2006)] info  org.apache.storm.zookeeper.clientcnxn - socket connection established localhost/127.0.0.1:2006, initiating session 7175 [nioservercxn.factory:0.0.0.0/0.0.0.0:2006] warn  org.apache.storm.zookeeper.server.nioservercnxn - caught end of stream exception org.apache.storm.zookeeper.server.servercnxn$endofstreamexception: unable read additional data client sessionid 0x14dadefc40c0012, client has closed socket @ org.apache.storm.zookeeper.server.nioservercnxn.doio(nioservercnxn.java:228) ~[storm-core-0.9.4.jar:0.9.4] @ org.apache.storm.zookeeper.server.nioservercnxnfactory.run(nioservercnxnfactory.java:208) [storm-core-0.9.4.jar:0.9.4] @ java.lang.thread.run(thread.java:745) [na:1.7.0_79] 7175 [thread-17-querycounter-eventthread] info  org.apache.curator.framework.state.connectionstatemanager - state change: connected 7175 [nioservercxn.factory:0.0.0.0/0.0.0.0:2006] info  org.apache.storm.zookeeper.server.nioservercnxn - closed socket connection client /127.0.0.1:51264 had sessionid 0x14dadefc40c0012 7176 [nioservercxn.factory:0.0.0.0/0.0.0.0:2006] info  org.apache.storm.zookeeper.server.nioservercnxnfactory - accepted socket connection /127.0.0.1:51267 7176 [nioservercxn.factory:0.0.0.0/0.0.0.0:2006] info  org.apache.storm.zookeeper.server.zookeeperserver - client attempting establish new session @ /127.0.0.1:51267 7190 [thread-17-querycounter] error backtype.storm.util - async loop died! java.lang.runtimeexception: java.lang.runtimeexception: org.apache.zookeeper.keeperexception$nonodeexception: keepererrorcode = nonode /brokers/topics/randomquery/partitions @ storm.kafka.dynamicbrokersreader.getbrokerinfo(dynamicbrokersreader.java:81) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.trident.zkbrokerreader.<init>(zkbrokerreader.java:42) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.kafkautils.makebrokerreader(kafkautils.java:57) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.kafkaspout.open(kafkaspout.java:87) ~[storm-kafka-0.9.4.jar:0.9.4] @ backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) ~[storm-core-0.9.4.jar:0.9.4] @ backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4] @ clojure.lang.afn.run(afn.java:24) [clojure-1.5.1.jar:na] @ java.lang.thread.run(thread.java:745) [na:1.7.0_79] caused by: java.lang.runtimeexception: org.apache.zookeeper.keeperexception$nonodeexception: keepererrorcode = nonode /brokers/topics/randomquery/partitions @ storm.kafka.dynamicbrokersreader.getnumpartitions(dynamicbrokersreader.java:94) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.dynamicbrokersreader.getbrokerinfo(dynamicbrokersreader.java:65) ~[storm-kafka-0.9.4.jar:0.9.4] ... 7 common frames omitted caused by: org.apache.zookeeper.keeperexception$nonodeexception: keepererrorcode = nonode /brokers/topics/randomquery/partitions @ org.apache.zookeeper.keeperexception.create(keeperexception.java:111) ~[zookeeper-3.4.6.jar:3.4.6-1569965] @ org.apache.zookeeper.keeperexception.create(keeperexception.java:51) ~[zookeeper-3.4.6.jar:3.4.6-1569965] @ org.apache.zookeeper.zookeeper.getchildren(zookeeper.java:1590) ~[zookeeper-3.4.6.jar:3.4.6-1569965] @ org.apache.curator.framework.imps.getchildrenbuilderimpl$3.call(getchildrenbuilderimpl.java:214) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl$3.call(getchildrenbuilderimpl.java:203) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.retryloop.callwithretry(retryloop.java:107) ~[curator-client-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl.pathinforeground(getchildrenbuilderimpl.java:199) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl.forpath(getchildrenbuilderimpl.java:191) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl.forpath(getchildrenbuilderimpl.java:38) ~[curator-framework-2.5.0.jar:na] @ storm.kafka.dynamicbrokersreader.getnumpartitions(dynamicbrokersreader.java:91) ~[storm-kafka-0.9.4.jar:0.9.4] ... 8 common frames omitted 7191 [thread-17-querycounter] error backtype.storm.daemon.executor -  java.lang.runtimeexception: java.lang.runtimeexception: org.apache.zookeeper.keeperexception$nonodeexception: keepererrorcode = nonode /brokers/topics/randomquery/partitions @ storm.kafka.dynamicbrokersreader.getbrokerinfo(dynamicbrokersreader.java:81) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.trident.zkbrokerreader.<init>(zkbrokerreader.java:42) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.kafkautils.makebrokerreader(kafkautils.java:57) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.kafkaspout.open(kafkaspout.java:87) ~[storm-kafka-0.9.4.jar:0.9.4] @ backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) ~[storm-core-0.9.4.jar:0.9.4] @ backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4] @ clojure.lang.afn.run(afn.java:24) [clojure-1.5.1.jar:na] @ java.lang.thread.run(thread.java:745) [na:1.7.0_79] caused by: java.lang.runtimeexception: org.apache.zookeeper.keeperexception$nonodeexception: keepererrorcode = nonode /brokers/topics/randomquery/partitions @ storm.kafka.dynamicbrokersreader.getnumpartitions(dynamicbrokersreader.java:94) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.dynamicbrokersreader.getbrokerinfo(dynamicbrokersreader.java:65) ~[storm-kafka-0.9.4.jar:0.9.4] ... 7 common frames omitted caused by: org.apache.zookeeper.keeperexception$nonodeexception: keepererrorcode = nonode /brokers/topics/randomquery/partitions @ org.apache.zookeeper.keeperexception.create(keeperexception.java:111) ~[zookeeper-3.4.6.jar:3.4.6-1569965] @ org.apache.zookeeper.keeperexception.create(keeperexception.java:51) ~[zookeeper-3.4.6.jar:3.4.6-1569965] @ org.apache.zookeeper.zookeeper.getchildren(zookeeper.java:1590) ~[zookeeper-3.4.6.jar:3.4.6-1569965] @ org.apache.curator.framework.imps.getchildrenbuilderimpl$3.call(getchildrenbuilderimpl.java:214) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl$3.call(getchildrenbuilderimpl.java:203) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.retryloop.callwithretry(retryloop.java:107) ~[curator-client-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl.pathinforeground(getchildrenbuilderimpl.java:199) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl.forpath(getchildrenbuilderimpl.java:191) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl.forpath(getchildrenbuilderimpl.java:38) ~[curator-framework-2.5.0.jar:na] @ storm.kafka.dynamicbrokersreader.getnumpartitions(dynamicbrokersreader.java:91) ~[storm-kafka-0.9.4.jar:0.9.4] ... 8 common frames omitted  7257 [syncthread:0] info  org.apache.storm.zookeeper.server.zookeeperserver - established session 0x14dadefc40c0013 negotiated timeout 20000 client /127.0.0.1:51265 7257 [thread-17-querycounter-eventthread] info  org.apache.curator.framework.state.connectionstatemanager - state change: connected 7268 [syncthread:0] info  org.apache.storm.zookeeper.server.zookeeperserver - established session 0x14dadefc40c0014 negotiated timeout 20000 client /127.0.0.1:51267 7268 [thread-4-sendthread(localhost:2006)] info  org.apache.storm.zookeeper.clientcnxn - session establishment complete on server localhost/127.0.0.1:2006, sessionid = 0x14dadefc40c0014, negotiated timeout = 20000 7269 [thread-4-eventthread] info  org.apache.storm.curator.framework.state.connectionstatemanager - state change: connected     7306 [thread-4] info  backtype.storm.daemon.worker - reading assignments. 7400 [thread-17-querycounter] error backtype.storm.util - halting process: ("worker died") java.lang.runtimeexception: ("worker died") @ backtype.storm.util$exit_process_bang_.doinvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4] @ clojure.lang.restfn.invoke(restfn.java:423) [clojure-1.5.1.jar:na] @ backtype.storm.daemon.worker$fn__4693$fn__4694.invoke(worker.clj:491) [storm-core-0.9.4.jar:0.9.4] @ backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4] @ backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4] @ clojure.lang.afn.run(afn.java:24) [clojure-1.5.1.jar:na] @ java.lang.thread.run(thread.java:745) [na:1.7.0_79] 

my topology:

public class topologyquerycountermain {   static final logger logger = logger.getlogger(topologyquerycountermain.class);   private static final string spout_id = "querycounter";   public static void main(string[] args) throws alreadyaliveexception, invalidtopologyexception {      int numspoutexecutors = 1;     logger.debug("this spoutconfig");     kafkaspout kspout = querycounter();     topologybuilder builder = new topologybuilder();     logger.debug("this set spout");     builder.setspout(spout_id, kspout, numspoutexecutors);     logger.debug("this set bolt");     builder.setbolt("word-normalizer", new wordnormalizer())         .shufflegrouping(spout_id);     builder.setbolt("word-counter", new wordcounter(),1)         .fieldsgrouping("word-normalizer", new fields("sentence"));       config conf = new config();     localcluster cluster = new localcluster();     logger.debug("this submit cluster");     conf.put(config.nimbus_host, "192.168.1.229");     conf.put(config.nimbus_thrift_port, 6627);      system.setproperty("storm.jar", "/home/ubuntu/workspace/querycounter/target/querycounter-0.0.1-snapshot.jar");     conf.setnumworkers(20);     conf.setmaxspoutpending(5000);      if (args != null && args.length > 0) {         stormsubmitter. submittopology(args[0], conf, builder.createtopology());     }      else     {            cluster.submittopology("querycounter", conf, builder.createtopology());         utils.sleep(10000);         cluster.killtopology("querycounter");         logger.debug("this shutdown cluster");         //cluster.shutdown();     } }   private static kafkaspout querycounter() {     string zkhostport = "localhost:2181";     string topic = "randomquery";      string zkroot = "/querycounter";     string zkspoutid = "querycounter-spout";     zkhosts zkhosts = new zkhosts(zkhostport);      logger.debug("this inside kafka spout cluster");     spoutconfig spoutcfg = new spoutconfig(zkhosts, topic, zkroot, zkspoutid);     spoutcfg.scheme=new schemeasmultischeme(new stringscheme());     kafkaspout kafkaspout = new kafkaspout(spoutcfg);     return kafkaspout;   }  } 

wordnormalizer bolt

public class wordnormalizer extends basebasicbolt { static final logger logger = logger.getlogger(wordnormalizer.class); public void cleanup() {}  /**  * bolt receive line  * words file , process normalize line  *   * normalize put words in lower case  * , split line words in   */ public void execute(tuple input, basicoutputcollector collector) {     string sentence = input.getstring(0);     logger.debug("this word_normalizer funtion");          sentence = sentence.trim();         system.out.println("in normalizer : "+sentence);         if(!sentence.isempty()){             sentence = sentence.tolowercase();             collector.emit(new values(sentence));             logger.debug("this word_normalizer emitting value1");         }     }    /**  * bolt emit field "word"   */ public void declareoutputfields(outputfieldsdeclarer declarer) {     declarer.declare(new fields("sentence"));     logger.debug("this word_normalizer emitting value2"); } } 

wordcounter

public class wordcounter extends basebasicbolt { static final logger logger = logger.getlogger(wordcounter.class); integer id; string name; map<string, integer> counters;  /**  * @ end of spout (when cluster shutdown  * show word counters  */ @override public void cleanup() {     system.out.println("-- word counter ["+name+"-"+id+"] --");     for(map.entry<string, integer> entry : counters.entryset()){         system.out.println(entry.getkey()+": "+entry.getvalue());         logger.debug("this word_counter cleanup funtion");     } }  /**  * on create   */ @override public void prepare(map stormconf, topologycontext context) {     this.counters = new hashmap<string, integer>();     this.name = context.getthiscomponentid();     this.id = context.getthistaskid(); }  @override public void declareoutputfields(outputfieldsdeclarer declarer) {}   @override public void execute(tuple input, basicoutputcollector collector) {     string str = input.getstring(0);     /**      * if word dosn't exist in map create      * this, if not add 1       */logger.debug("this word_counter execute funtion");     if(!counters.containskey(str)){         counters.put(str, 1);     }else{         integer c = counters.get(str) + 1;         counters.put(str, c);         system.out.println("in counter:" + c);     }  } } 

what modification need do?


Comments

Popular posts from this blog

angularjs - ADAL JS Angular- WebAPI add a new role claim to the token -

php - CakePHP HttpSockets send array of paramms -

node.js - Using Node without global install -