ERROR backtype.storm.util - Async loop died java.lang.RuntimeException: java.lang.RuntimeException -
i running simple word count topology using kafka storm integration.i getting output when using spout source text file.but when using kafka spout getting following error.
the error :
error backtype.storm.util - async loop died! java.lang.runtimeexception: java.lang.runtimeexception: org.apache.zookeeper.keeperexception$nonodeexception: keepererrorcode = nonode /brokers/topics/randomquery/partitions
7173 [thread-4-sendthread(localhost:2006)] info org.apache.storm.zookeeper.clientcnxn - opening socket connection server localhost/127.0.0.1:2006. not attempt authenticate using sasl (unknown error) 7174 [thread-4-sendthread(localhost:2006)] info org.apache.storm.zookeeper.clientcnxn - socket connection established localhost/127.0.0.1:2006, initiating session 7175 [nioservercxn.factory:0.0.0.0/0.0.0.0:2006] warn org.apache.storm.zookeeper.server.nioservercnxn - caught end of stream exception org.apache.storm.zookeeper.server.servercnxn$endofstreamexception: unable read additional data client sessionid 0x14dadefc40c0012, client has closed socket @ org.apache.storm.zookeeper.server.nioservercnxn.doio(nioservercnxn.java:228) ~[storm-core-0.9.4.jar:0.9.4] @ org.apache.storm.zookeeper.server.nioservercnxnfactory.run(nioservercnxnfactory.java:208) [storm-core-0.9.4.jar:0.9.4] @ java.lang.thread.run(thread.java:745) [na:1.7.0_79] 7175 [thread-17-querycounter-eventthread] info org.apache.curator.framework.state.connectionstatemanager - state change: connected 7175 [nioservercxn.factory:0.0.0.0/0.0.0.0:2006] info org.apache.storm.zookeeper.server.nioservercnxn - closed socket connection client /127.0.0.1:51264 had sessionid 0x14dadefc40c0012 7176 [nioservercxn.factory:0.0.0.0/0.0.0.0:2006] info org.apache.storm.zookeeper.server.nioservercnxnfactory - accepted socket connection /127.0.0.1:51267 7176 [nioservercxn.factory:0.0.0.0/0.0.0.0:2006] info org.apache.storm.zookeeper.server.zookeeperserver - client attempting establish new session @ /127.0.0.1:51267 7190 [thread-17-querycounter] error backtype.storm.util - async loop died! java.lang.runtimeexception: java.lang.runtimeexception: org.apache.zookeeper.keeperexception$nonodeexception: keepererrorcode = nonode /brokers/topics/randomquery/partitions @ storm.kafka.dynamicbrokersreader.getbrokerinfo(dynamicbrokersreader.java:81) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.trident.zkbrokerreader.<init>(zkbrokerreader.java:42) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.kafkautils.makebrokerreader(kafkautils.java:57) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.kafkaspout.open(kafkaspout.java:87) ~[storm-kafka-0.9.4.jar:0.9.4] @ backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) ~[storm-core-0.9.4.jar:0.9.4] @ backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4] @ clojure.lang.afn.run(afn.java:24) [clojure-1.5.1.jar:na] @ java.lang.thread.run(thread.java:745) [na:1.7.0_79] caused by: java.lang.runtimeexception: org.apache.zookeeper.keeperexception$nonodeexception: keepererrorcode = nonode /brokers/topics/randomquery/partitions @ storm.kafka.dynamicbrokersreader.getnumpartitions(dynamicbrokersreader.java:94) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.dynamicbrokersreader.getbrokerinfo(dynamicbrokersreader.java:65) ~[storm-kafka-0.9.4.jar:0.9.4] ... 7 common frames omitted caused by: org.apache.zookeeper.keeperexception$nonodeexception: keepererrorcode = nonode /brokers/topics/randomquery/partitions @ org.apache.zookeeper.keeperexception.create(keeperexception.java:111) ~[zookeeper-3.4.6.jar:3.4.6-1569965] @ org.apache.zookeeper.keeperexception.create(keeperexception.java:51) ~[zookeeper-3.4.6.jar:3.4.6-1569965] @ org.apache.zookeeper.zookeeper.getchildren(zookeeper.java:1590) ~[zookeeper-3.4.6.jar:3.4.6-1569965] @ org.apache.curator.framework.imps.getchildrenbuilderimpl$3.call(getchildrenbuilderimpl.java:214) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl$3.call(getchildrenbuilderimpl.java:203) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.retryloop.callwithretry(retryloop.java:107) ~[curator-client-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl.pathinforeground(getchildrenbuilderimpl.java:199) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl.forpath(getchildrenbuilderimpl.java:191) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl.forpath(getchildrenbuilderimpl.java:38) ~[curator-framework-2.5.0.jar:na] @ storm.kafka.dynamicbrokersreader.getnumpartitions(dynamicbrokersreader.java:91) ~[storm-kafka-0.9.4.jar:0.9.4] ... 8 common frames omitted 7191 [thread-17-querycounter] error backtype.storm.daemon.executor - java.lang.runtimeexception: java.lang.runtimeexception: org.apache.zookeeper.keeperexception$nonodeexception: keepererrorcode = nonode /brokers/topics/randomquery/partitions @ storm.kafka.dynamicbrokersreader.getbrokerinfo(dynamicbrokersreader.java:81) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.trident.zkbrokerreader.<init>(zkbrokerreader.java:42) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.kafkautils.makebrokerreader(kafkautils.java:57) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.kafkaspout.open(kafkaspout.java:87) ~[storm-kafka-0.9.4.jar:0.9.4] @ backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) ~[storm-core-0.9.4.jar:0.9.4] @ backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4] @ clojure.lang.afn.run(afn.java:24) [clojure-1.5.1.jar:na] @ java.lang.thread.run(thread.java:745) [na:1.7.0_79] caused by: java.lang.runtimeexception: org.apache.zookeeper.keeperexception$nonodeexception: keepererrorcode = nonode /brokers/topics/randomquery/partitions @ storm.kafka.dynamicbrokersreader.getnumpartitions(dynamicbrokersreader.java:94) ~[storm-kafka-0.9.4.jar:0.9.4] @ storm.kafka.dynamicbrokersreader.getbrokerinfo(dynamicbrokersreader.java:65) ~[storm-kafka-0.9.4.jar:0.9.4] ... 7 common frames omitted caused by: org.apache.zookeeper.keeperexception$nonodeexception: keepererrorcode = nonode /brokers/topics/randomquery/partitions @ org.apache.zookeeper.keeperexception.create(keeperexception.java:111) ~[zookeeper-3.4.6.jar:3.4.6-1569965] @ org.apache.zookeeper.keeperexception.create(keeperexception.java:51) ~[zookeeper-3.4.6.jar:3.4.6-1569965] @ org.apache.zookeeper.zookeeper.getchildren(zookeeper.java:1590) ~[zookeeper-3.4.6.jar:3.4.6-1569965] @ org.apache.curator.framework.imps.getchildrenbuilderimpl$3.call(getchildrenbuilderimpl.java:214) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl$3.call(getchildrenbuilderimpl.java:203) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.retryloop.callwithretry(retryloop.java:107) ~[curator-client-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl.pathinforeground(getchildrenbuilderimpl.java:199) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl.forpath(getchildrenbuilderimpl.java:191) ~[curator-framework-2.5.0.jar:na] @ org.apache.curator.framework.imps.getchildrenbuilderimpl.forpath(getchildrenbuilderimpl.java:38) ~[curator-framework-2.5.0.jar:na] @ storm.kafka.dynamicbrokersreader.getnumpartitions(dynamicbrokersreader.java:91) ~[storm-kafka-0.9.4.jar:0.9.4] ... 8 common frames omitted 7257 [syncthread:0] info org.apache.storm.zookeeper.server.zookeeperserver - established session 0x14dadefc40c0013 negotiated timeout 20000 client /127.0.0.1:51265 7257 [thread-17-querycounter-eventthread] info org.apache.curator.framework.state.connectionstatemanager - state change: connected 7268 [syncthread:0] info org.apache.storm.zookeeper.server.zookeeperserver - established session 0x14dadefc40c0014 negotiated timeout 20000 client /127.0.0.1:51267 7268 [thread-4-sendthread(localhost:2006)] info org.apache.storm.zookeeper.clientcnxn - session establishment complete on server localhost/127.0.0.1:2006, sessionid = 0x14dadefc40c0014, negotiated timeout = 20000 7269 [thread-4-eventthread] info org.apache.storm.curator.framework.state.connectionstatemanager - state change: connected 7306 [thread-4] info backtype.storm.daemon.worker - reading assignments. 7400 [thread-17-querycounter] error backtype.storm.util - halting process: ("worker died") java.lang.runtimeexception: ("worker died") @ backtype.storm.util$exit_process_bang_.doinvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4] @ clojure.lang.restfn.invoke(restfn.java:423) [clojure-1.5.1.jar:na] @ backtype.storm.daemon.worker$fn__4693$fn__4694.invoke(worker.clj:491) [storm-core-0.9.4.jar:0.9.4] @ backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4] @ backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4] @ clojure.lang.afn.run(afn.java:24) [clojure-1.5.1.jar:na] @ java.lang.thread.run(thread.java:745) [na:1.7.0_79]
my topology:
public class topologyquerycountermain { static final logger logger = logger.getlogger(topologyquerycountermain.class); private static final string spout_id = "querycounter"; public static void main(string[] args) throws alreadyaliveexception, invalidtopologyexception { int numspoutexecutors = 1; logger.debug("this spoutconfig"); kafkaspout kspout = querycounter(); topologybuilder builder = new topologybuilder(); logger.debug("this set spout"); builder.setspout(spout_id, kspout, numspoutexecutors); logger.debug("this set bolt"); builder.setbolt("word-normalizer", new wordnormalizer()) .shufflegrouping(spout_id); builder.setbolt("word-counter", new wordcounter(),1) .fieldsgrouping("word-normalizer", new fields("sentence")); config conf = new config(); localcluster cluster = new localcluster(); logger.debug("this submit cluster"); conf.put(config.nimbus_host, "192.168.1.229"); conf.put(config.nimbus_thrift_port, 6627); system.setproperty("storm.jar", "/home/ubuntu/workspace/querycounter/target/querycounter-0.0.1-snapshot.jar"); conf.setnumworkers(20); conf.setmaxspoutpending(5000); if (args != null && args.length > 0) { stormsubmitter. submittopology(args[0], conf, builder.createtopology()); } else { cluster.submittopology("querycounter", conf, builder.createtopology()); utils.sleep(10000); cluster.killtopology("querycounter"); logger.debug("this shutdown cluster"); //cluster.shutdown(); } } private static kafkaspout querycounter() { string zkhostport = "localhost:2181"; string topic = "randomquery"; string zkroot = "/querycounter"; string zkspoutid = "querycounter-spout"; zkhosts zkhosts = new zkhosts(zkhostport); logger.debug("this inside kafka spout cluster"); spoutconfig spoutcfg = new spoutconfig(zkhosts, topic, zkroot, zkspoutid); spoutcfg.scheme=new schemeasmultischeme(new stringscheme()); kafkaspout kafkaspout = new kafkaspout(spoutcfg); return kafkaspout; } }
wordnormalizer bolt
public class wordnormalizer extends basebasicbolt { static final logger logger = logger.getlogger(wordnormalizer.class); public void cleanup() {} /** * bolt receive line * words file , process normalize line * * normalize put words in lower case * , split line words in */ public void execute(tuple input, basicoutputcollector collector) { string sentence = input.getstring(0); logger.debug("this word_normalizer funtion"); sentence = sentence.trim(); system.out.println("in normalizer : "+sentence); if(!sentence.isempty()){ sentence = sentence.tolowercase(); collector.emit(new values(sentence)); logger.debug("this word_normalizer emitting value1"); } } /** * bolt emit field "word" */ public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields("sentence")); logger.debug("this word_normalizer emitting value2"); } }
wordcounter
public class wordcounter extends basebasicbolt { static final logger logger = logger.getlogger(wordcounter.class); integer id; string name; map<string, integer> counters; /** * @ end of spout (when cluster shutdown * show word counters */ @override public void cleanup() { system.out.println("-- word counter ["+name+"-"+id+"] --"); for(map.entry<string, integer> entry : counters.entryset()){ system.out.println(entry.getkey()+": "+entry.getvalue()); logger.debug("this word_counter cleanup funtion"); } } /** * on create */ @override public void prepare(map stormconf, topologycontext context) { this.counters = new hashmap<string, integer>(); this.name = context.getthiscomponentid(); this.id = context.getthistaskid(); } @override public void declareoutputfields(outputfieldsdeclarer declarer) {} @override public void execute(tuple input, basicoutputcollector collector) { string str = input.getstring(0); /** * if word dosn't exist in map create * this, if not add 1 */logger.debug("this word_counter execute funtion"); if(!counters.containskey(str)){ counters.put(str, 1); }else{ integer c = counters.get(str) + 1; counters.put(str, c); system.out.println("in counter:" + c); } } }
what modification need do?
Comments
Post a Comment