prepare method executing multiple times

1 answer

Hi I am creating a Topology using apache-storm in which my Spout is collecting data from Kakfa Topic and sending it to a bolt.

I am doing some validation over the tuple and emitting stream again for other bolt.

Now the issue is that my second bolt which is using stream of the first bolt has a overload method prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) which is executing after let say every 2 seconds.

Code for topology is

topologyBuilder.setBolt("abc",new ValidationBolt()).shuffleGrouping(configurations.SPOUT_ID);  topologyBuilder.setBolt("TEST",new TestBolt()).shuffleGrouping("abc",Utils.VALIDATED_STREAM); 

Code for First bolt "abc" is

@Override     public void execute(Tuple tuple) {         String document = String.valueOf(tuple.getValue(4));         if (Utils.isJSONValid(document)) {             outputCollector.emit(Utils.VALIDATED_STREAM,new Values(document));         }     }       @Override     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {         outputFieldsDeclarer.declareStream(Utils.VALIDATED_STREAM,new Fields("document"));     } 

While I was searching I found

The prepare method is called when the bolt is initialised and is  similar to the open method in spout. It is called only once for the bolt. It gets the configuration for the bolt and also the context of the bolt.  The collector is used to emit or output the tuples from this bolt.  

Link to public gist for log Storm topology log

All answers to this question, which has the identifier 57627459

The best answer:

Your log shows you are using LocalCluster. It is a testing/demo tool, don't use it for production workloads. Instead set up a real distributed cluster.

Regarding what is happening:

When you run topologies in a LocalCluster, Storm simulates a real cluster by just running all the components (Nimbus, Supervisors and workers) as threads in a single JVM. Your log shows these lines:

20:14:12.451 [SLOT_1027] INFO o.a.s.ProcessSimulator - Begin killing process 2ea97301-24c9-4c1a-bcba-61008693971a

20:14:12.451 [SLOT_1027] INFO o.a.s.d.w.Worker - Shutting down worker smart-transactional-data-1-1566571315 72bbf510-c342-4385-9599-0821a2dee94e 1027

20:14:15.518 [SLOT_1027] INFO o.a.s.d.s.Slot - STATE running msInState: 33328 topo:smart-transactional-data-1-1566571315 worker:2ea97301-24c9-4c1a-bcba-61008693971a -> kill-blob-update msInState: 3001 topo:smart-transactional-data-1-1566571315 worker:2ea97301-24c9-4c1a-bcba-61008693971a

20:14:15.540 [SLOT_1027] INFO o.a.s.d.w.Worker - Launching worker for smart-transactional-data-1-1566571315

The LocalCluster is shutting down one of the simulated workers, because one of the blobs (e.g. topology jar, topology configuration, other types of shared files, see more at https://storm.apache.org/releases/2.0.0/distcache-blobstore.html) in the blobstore changed. Normally when this happens in a real cluster, the worker JVM will be killed, the blob will be updated and the worker will restart. Since you are using LocalCluster, it just kills the worker thread and restarts it. This is why you are seeing multiple invocations of prepare.

Last questions

how do i remove the switch on my home screen?
how to edit the JS date and time to update atuomatically?
How to utilize data stored in a multidimensional array
Powermockito not mocking URL constructor in URI.toURL() method
Android Bluetooth LE Scanner only scans when phone's Location is turned on in some devices
docker wordpress container can't connect to mysql container
How can I declare a number in java that is more than 64-bits? [duplicate]
Optaplanner solutionClass entityCollectionProperty should never return null error when simple JSON object passed to controller
Anylogic, get the time a pedestrain is in a queue
How do I fix this syntax issue with my .flex file?
Optimizing query in PHP
How to find the highest number of a column and print two columns of that row in R?
Ideas on “Error: Type com.google.firebase.iid.zzav is referenced as an interface from com.google.firebase.messaging.zzd”?
JCIFS SmbFile.exists() and SmbFile.isDirectory() return false when it exists and I can listFiles()
PHP total order
Laravel booking system design
neural net - undefined column selected
How to indicate y axis does not start from 0 in ggplot?
Fragments in backStack
Spinner how to change the data