这个是在window环境下面安装的kafka

  • 下载pom依赖

编写代码

  • 编写SplitSentenceBolt
    public class SplitSentenceBolt extends BaseRichBolt {
    private OutputCollector collector;

    1. @Override
    2. public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    3. this.collector=outputCollector;
    4. }
    5. @Override
    6. public void execute(Tuple tuple) {
    7. //String sentece = tuple.getStringByField("sentence");
    8. String sentece=tuple.getString(4);
    9. String[] words = sentece.split(" ");
    10. for (String word:words){
    11. collector.emit(new Values(word));
    12. }
    13. }
    14. @Override
    15. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    16. outputFieldsDeclarer.declare(new Fields("words"));
    17. }

    }

  • 编写WordCountBolt
    public class WordCountBolt extends BaseRichBolt {

    1. private OutputCollector collector;
    2. private HashMap<String,Long> counts =null;
    3. @Override
    4. public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    5. this.collector = outputCollector;
    6. this.counts = new HashMap<>();
    7. }
    8. @Override
    9. public void execute(Tuple tuple) {
    10. String word = tuple.getStringByField("words");
    11. // String word =tuple.getString(0);
    12. Long count=this.counts.get(word);
    13. if(count==null){
    14. count=0L;
    15. }
    16. count++;
    17. //出现就添加到map中,word相同的,会覆盖掉 所以最后的word就是准确的数据
    18. this.counts.put(word,count);
    19. this.collector.emit(new Values(word,count));
    20. }
    21. @Override
    22. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    23. outputFieldsDeclarer.declare(new Fields("word","count"));
    24. }

    }

  • 编写ReportBolt
    public class ReportBolt extends BaseRichBolt {
    private HashMap

    1. @Override
    2. public void execute(Tuple input) {
    3. String word=input.getStringByField("word");
    4. Long count=input.getLongByField("count");
    5. this.counts.put(word, count);
    6. System.out.println("--------FINAL COUNTS--------");
    7. List<String> keys=new ArrayList<String>();
    8. keys.addAll(this.counts.keySet());
    9. Collections.sort(keys);
    10. for(String key:keys){
    11. System.out.println(key+":"+this.counts.get(key));
    12. }
    13. System.out.println("----------------------------");
    14. }
    15. @Override
    16. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    17. }

    }

  • 编写Topology
    public class MainTopology {
    public static void main(String[] args)throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    KafkaSpoutConfig.Builder

    1. //设置kafka属于哪个组
    2. kafkabuilder.setGroupId("testgroup");
    3. //创建kafkaspoutConfig
    4. KafkaSpoutConfig<String, String> build = kafkabuilder.build();
    5. //通过kafkaspoutconfig获取kafkaspout
    6. KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(build);
    7. //设置四个线程接收数据
    8. builder.setSpout("kafkaspout",kafkaSpout,4);

    // builder.setBolt(“printBolt”, new PrintBolt(),2).localOrShuffleGrouping(“kafkaspout”);

    1. builder.setBolt("split-bolt",new SplitSentenceBolt(),2).setNumTasks(4).shuffleGrouping("kafkaspout");
    2. // 有时候我们需要将特定数据的tuple路由到特殊的bolt实例中,在此我们使用fieldsGrouping
    3. // 来保证所有"word"字段值相同的tuple会被路由到同一个WordCountBolt实例中
    4. builder.setBolt("count-bolt",new WordCountBolt(),2).fieldsGrouping("split-bolt",new Fields("words"));
    5. builder.setBolt("report-bolt",new ReportBolt()).globalGrouping("count-bolt");
    6. Config config=new Config();
    7. config.setDebug(false);
    8. config.setNumWorkers(2);
    9. LocalCluster cluster =new LocalCluster();
    10. cluster.submitTopology("kafkaspout",config,builder.createTopology());
    11. }

版权声明:本文为zhuguangzhe原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/zhuguangzhe/p/9229808.html