小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

kafka+storm+hbase架構(gòu)設(shè)計

 陳永正的圖書館 2016-11-18
 kafka+storm+hbase架構(gòu)設(shè)計
kafka+storm+hbase架構(gòu)設(shè)計:kafka作為分布式消息系統(tǒng),實時消息系統(tǒng),有生產(chǎn)者和消費者;storm作為大數(shù)據(jù)的實時處理系統(tǒng);hbase是apache hadoop 的數(shù)據(jù)庫,其具有高效的讀寫性能!
這里把kafka生產(chǎn)的數(shù)據(jù)作為storm的源頭spout來消費,經(jīng)過bolt處理把結(jié)果保存到hbase。

基礎(chǔ)環(huán)境:這里就不介紹了??!
hadoop集群(zookeeper)
kafka集群
storm集群

1、kafka測試API(包括生產(chǎn)者消費者)

生產(chǎn)者
import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class Producer extends Thread {
    private final kafka.javaapi.producer.Producer<Integer, String> producer;
    private final String topic;
    private final Properties props = new Properties();

    public Producer(String topic) {
        props.put("serializer.class", "kafka.serializer.StringEncoder");    
        props.put("metadata.broker.list","192.168.80.20:9092,192.168.80.21:9092,192.168.80.22:9092");
        producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
        this.topic = topic;
    }

    public void run() {
        for (int i = 0; i < 2000; i++) {
            String messageStr = new String("Message_" + i);
            System.out.println("product:"+messageStr);
            producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
        }

    }

    public static void main(String[] args) {
        Producer producerThread = new Producer(KafkaProperties.topic);
        producerThread.start();
    }
}



2、消費者測試API:
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class Consumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;

    public Consumer(String topic) {
        consumer = kafka.consumer.Consumer
                .createJavaConsumerConnector(createConsumerConfig());
        this.topic = topic;
    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id", KafkaProperties.groupId);
        //props.put("zookeeper.session.timeout.ms", "400");
        //props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "60000");//

        return new ConsumerConfig(props);

    }
// push消費方式,服務(wù)端推送過來。主動方式是pull
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
                .createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        
        while (it.hasNext()){
            //邏輯處理
            System.out.println("consumer:"+new String(it.next().message()));
            
        }
            
    }

    public static void main(String[] args) {
        Consumer consumerThread = new Consumer(KafkaProperties.topic);
        consumerThread.start();
    }
}



3、定義kafka消費者的一些常量:
public interface KafkaProperties
{
  final static String zkConnect = "192.168.80.20:2181,192.168.80.20:2181,192.168.80.20:2181";
  final static  String groupId = "group";
  final static String topic = "test";
}



4、在進行項目之前準備一些hbase工具類:


import java.util.List;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
public interface HBaseDAO {

    public void save(Put put,String tableName) ;
    public void insert(String tableName,String rowKey,String family,String quailifer,String value) ;
    public void save(List<Put>Put ,String tableName) ;
    public Result getOneRow(String tableName,String rowKey) ;
    public List<Result> getRows(String tableName,String rowKey_like) ;
    public List<Result> getRows(String tableName, String rowKeyLike, String cols[]) ;
    public List<Result> getRows(String tableName,String startRow,String stopRow) ;
}




import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import HBaseDAO;

public class HBaseDAOImp implements HBaseDAO{

    HConnection hTablePool = null;
    public HBaseDAOImp()
    {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum","192.168.80.20,192.168.80.21,192.168.80.22");
        conf.set("hbase.rootdir", "hdfs://cluster/hbase");
        try {
            hTablePool = HConnectionManager.createConnection(conf) ;
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void save(Put put, String tableName) {
        // TODO Auto-generated method stub
        HTableInterface table = null;
        try {
            table = hTablePool.getTable(tableName) ;
            table.put(put) ;
            
        } catch (Exception e) {
            e.printStackTrace() ;
        }finally{
            try {
                table.close() ;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    @Override
    public void insert(String tableName, String rowKey, String family,
            String quailifer, String value) {
        // TODO Auto-generated method stub
        HTableInterface table = null;
        try {
            table = hTablePool.getTable(tableName) ;
            Put put = new Put(rowKey.getBytes());
            put.add(family.getBytes(), quailifer.getBytes(), value.getBytes()) ;
            table.put(put);
        } catch (Exception e) {
            e.printStackTrace();
        }finally
        {
            try {
                table.close() ;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    @Override
    public void save(List<Put> Put, String tableName) {
        // TODO Auto-generated method stub
        HTableInterface table = null;
        try {
            table = hTablePool.getTable(tableName) ;
            table.put(Put) ;
        }
        catch (Exception e) {
            // TODO: handle exception
        }finally
        {
            try {
                table.close() ;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
    }


    @Override
    public Result getOneRow(String tableName, String rowKey) {
        // TODO Auto-generated method stub
        HTableInterface table = null;
        Result rsResult = null;
        try {
            table = hTablePool.getTable(tableName) ;
            Get get = new Get(rowKey.getBytes()) ;
            rsResult = table.get(get) ;
        } catch (Exception e) {
            e.printStackTrace() ;
        }
        finally
        {
            try {
                table.close() ;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return rsResult;
    }

    @Override
    public List<Result> getRows(String tableName, String rowKeyLike) {
        // TODO Auto-generated method stub
        HTableInterface table = null;
        List<Result> list = null;
        try {
            table = hTablePool.getTable(tableName) ;
            PrefixFilter filter = new PrefixFilter(rowKeyLike.getBytes());
            Scan scan = new Scan();
            scan.setFilter(filter);
            ResultScanner scanner = table.getScanner(scan) ;
            list = new ArrayList<Result>() ;
            for (Result rs : scanner) {
                list.add(rs) ;
            }
        } catch (Exception e) {
            e.printStackTrace() ;
        }
        finally
        {
            try {
                table.close() ;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return list;
    }
    
    public List<Result> getRows(String tableName, String rowKeyLike ,String cols[]) {
        // TODO Auto-generated method stub
        HTableInterface table = null;
        List<Result> list = null;
        try {
            table = hTablePool.getTable(tableName) ;
            PrefixFilter filter = new PrefixFilter(rowKeyLike.getBytes());
            Scan scan = new Scan();
            for (int i = 0; i < cols.length; i++) {
                scan.addColumn("cf".getBytes(), cols[i].getBytes()) ;
            }
            scan.setFilter(filter);
            ResultScanner scanner = table.getScanner(scan) ;
            list = new ArrayList<Result>() ;
            for (Result rs : scanner) {
                list.add(rs) ;
            }
        } catch (Exception e) {
            e.printStackTrace() ;
        }
        finally
        {
            try {
                table.close() ;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return list;
    }
    public List<Result> getRows(String tableName,String startRow,String stopRow)
    {
        HTableInterface table = null;
        List<Result> list = null;
        try {
            table = hTablePool.getTable(tableName) ;
            Scan scan = new Scan() ;
            scan.setStartRow(startRow.getBytes()) ;
            scan.setStopRow(stopRow.getBytes()) ;
            ResultScanner scanner = table.getScanner(scan) ;
            list = new ArrayList<Result>() ;
            for (Result rsResult : scanner) {
                list.add(rsResult) ;
            }
            
        }catch (Exception e) {
            e.printStackTrace() ;
        }
        finally
        {
            try {
                table.close() ;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return list;
    }
    
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        HBaseDAO dao = new HBaseDAOImp();
        List<Put> list = new ArrayList<Put>();
        Put put = new Put("aa".getBytes());
        put.add("cf".getBytes(), "name".getBytes(), "zhangsan".getBytes()) ;
        list.add(put) ;
//        dao.save(put, "test") ;
        put.add("cf".getBytes(), "addr".getBytes(), "beijing".getBytes()) ;
        list.add(put) ;
        put.add("cf".getBytes(), "age".getBytes(), "30".getBytes()) ;
        list.add(put) ;
        put.add("cf".getBytes(), "tel".getBytes(), "13567882341".getBytes()) ;
        list.add(put) ;
        
        dao.save(list, "test");
//        dao.save(put, "test") ;
//        dao.insert("test", "testrow", "cf", "age", "35") ;
//        dao.insert("test", "testrow", "cf", "cardid", "12312312335") ;
//        dao.insert("test", "testrow", "cf", "tel", "13512312345") ;
        
    }

}



下面正式編寫簡單項目代碼
1)實現(xiàn)寫kafka生產(chǎn)者:
import java.util.Properties;
import java.util.Random;
import DateFmt;
import backtype.storm.utils.Utils;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class Producer extends Thread {
    private final kafka.javaapi.producer.Producer<Integer, String> producer;
    private final String topic;
    private final Properties props = new Properties();

    public Producer(String topic) {
        props.put("serializer.class", "kafka.serializer.StringEncoder");// 字符串消息
        props.put("metadata.broker.list", "192.168.80.20:9092,192.168.80.21:9092,192.168.80.22:9092");
        producer = new kafka.javaapi.producer.Producer<Integer, String>( new ProducerConfig(props));
        this.topic = topic;
    }

    public void run() {
        // order_id,order_amt,create_time,area_id
        Random random = new Random();
        String[] order_amt = { "10.10", "20.10", "30.10","40.0", "60.10" };
        String[] area_id = { "1","2","3","4","5" };
        
        int i =0 ;
        while(true) {
            i ++ ;
            String messageStr = i+"\t"+order_amt[random.nextInt(5)]+"\t"+DateFmt.getCountDate(null, DateFmt.date_long)+"\t"+area_id[random.nextInt(5)] ;
            System.out.println("product:"+messageStr);
            producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
            //Utils.sleep(1000) ;

        }

    }

    public static void main(String[] args) {
        Producer producerThread = new Producer(KafkaProperties.topic);
        producerThread.start();
    }
}



2)這里用到其他時間轉(zhuǎn)換工具類:
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class DateFmt {

    public static final String date_long = "yyyy-MM-dd HH:mm:ss" ;
    public static final String date_short = "yyyy-MM-dd" ;
    
    public static SimpleDateFormat sdf = new SimpleDateFormat(date_short);
    
    public static String getCountDate(String date,String patton)
    {
        SimpleDateFormat sdf = new SimpleDateFormat(patton);
        Calendar cal = Calendar.getInstance(); 
        if (date != null) {
            try {
                cal.setTime(sdf.parse(date)) ;
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
        return sdf.format(cal.getTime());
    }
    
    public static String getCountDate(String date,String patton,int step)
    {
        SimpleDateFormat sdf = new SimpleDateFormat(patton);
        Calendar cal = Calendar.getInstance(); 
        if (date != null) {
            try {
                cal.setTime(sdf.parse(date)) ;
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
        cal.add(Calendar.DAY_OF_MONTH, step) ;
        return sdf.format(cal.getTime());
    }
    
    public static Date parseDate(String dateStr) throws Exception
    {
        return sdf.parse(dateStr);
    }
    
    public static void main(String[] args) throws Exception{
    System.out.println(DateFmt.getCountDate(null, DateFmt.date_short));
    //System.out.println(DateFmt.getCountDate("2014-03-01 12:13:14", DateFmt.date_short));

    //System.out.println(parseDate("2014-05-02").after(parseDate("2014-05-01")));
    }

}



3)下面寫項目中的kafka消費comsumer:


這里把消費者的消費的數(shù)據(jù)保存到一個有順序的隊列里!(為了作為storm spout數(shù)據(jù)的來源)--------------非常重要哦?。。。。?!
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.productor.KafkaProperties;

public class OrderConsumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;

    private Queue<String> queue = new ConcurrentLinkedQueue<String>() ;//有序隊列
    
    public OrderConsumer(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
        this.topic = topic;
    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id", KafkaProperties.groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");//zookeeper offset偏移量

        return new ConsumerConfig(props);

    }
// push消費方式,服務(wù)端推送過來。主動方式是pull
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
                .createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        
        while (it.hasNext()){
            //邏輯處理
            System.out.println("consumer:"+new String(it.next().message()));
            queue.add(new String(it.next().message())) ;
            System.err.println("隊列----->"+queue);
        }
            
    }

    public Queue<String> getQueue()
    {
        return queue ;
    }
    
    public static void main(String[] args) {
        OrderConsumer consumerThread = new OrderConsumer(KafkaProperties.Order_topic);
        consumerThread.start();
      
    }
}



4)下面開始寫storm部分包括spout和bolt
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import kafka.consumers.OrderConsumer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class OrderBaseSpout implements IRichSpout {

    String topic = null;
    public OrderBaseSpout(String topic)
    {
        this.topic = topic ;
    }
    /**
     * 公共基類spout
     */
    private static final long serialVersionUID = 1L;
    Integer TaskId = null;
    SpoutOutputCollector collector = null;
    Queue<String> queue = new ConcurrentLinkedQueue<String>() ;
    
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

        declarer.declare(new Fields("order")) ;
    }

    public void nextTuple() {
        // TODO Auto-generated method stub
        if (queue.size() > 0) {
            String str = queue.poll() ;
            //進行數(shù)據(jù)過濾
            System.err.println("TaskId:"+TaskId+";  str="+str);
            collector.emit(new Values(str)) ;
        }
    }

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        this.collector = collector ;
        TaskId = context.getThisTaskId() ;
//        Thread.currentThread().getId()
        OrderConsumer consumer = new OrderConsumer(topic) ;
        consumer.start() ;
        queue = consumer.getQueue() ;
    }

    
    public void ack(Object msgId) {
        // TODO Auto-generated method stub
        
    }

    
    public void activate() {
        // TODO Auto-generated method stub
        
    }

    
    public void close() {
        // TODO Auto-generated method stub
        
    }

    
    public void deactivate() {
        // TODO Auto-generated method stub
        
    }

    
    public void fail(Object msgId) {
        // TODO Auto-generated method stub
        
    }

    
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
}




storm有了源頭數(shù)據(jù)數(shù)據(jù),該如何處理呢?下面要根據(jù)自己公司業(yè)務(wù)邏輯進行處理,我這里只是簡單處理,只是為了把流程走完整而已!
下面有3個bolt:


import java.util.Map;
import DateFmt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class AreaFilterBolt implements IBasicBolt {

    private static final long serialVersionUID = 1L;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("area_id","order_amt","order_date"));
        
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String order = input.getString(0);
        if(order != null){
            String[] orderArr = order.split("\\t");
            // ared_id,order_amt,create_time
            collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));
        System.out.println("--------------》"+orderArr[3]+orderArr[1]);
        }
        
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub
        
    }

}






import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import HBaseDAO;
import HBaseDAOImp;
import DateFmt;

public class AreaAmtBolt  implements IBasicBolt{


    private static final long serialVersionUID = 1L;
    Map <String,Double> countsMap = null ;
    String today = null;
    HBaseDAO dao = null;
    
    @Override
    public void cleanup() {
        //???
        countsMap.clear() ;
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("date_area","amt")) ;    
        
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        countsMap = new HashMap<String, Double>() ;
        dao = new HBaseDAOImp() ;
        //根據(jù)hbase里初始值進行初始化countsMap
        today = DateFmt.getCountDate(null, DateFmt.date_short);
        countsMap = this.initMap(today, dao);
        for(String key:countsMap.keySet())
        {
            System.err.println("key:"+key+"; value:"+countsMap.get(key));
        }
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        if (input != null) {
            String area_id = input.getString(0) ;
            double order_amt = 0.0;
            //order_amt = input.getDouble(1) ;
            try {
                order_amt = Double.parseDouble(input.getString(1)) ;
            } catch (Exception e) {
                System.out.println(input.getString(1)+":---------------------------------");
                e.printStackTrace() ;
            }
            
            String order_date = input.getStringByField("order_date") ;
            
            if (! order_date.equals(today)) {
                //跨天處理
                countsMap.clear() ;
            }
            
            Double count = countsMap.get(order_date+"_"+area_id) ;
            if (count == null) {
                count = 0.0 ;
            }
            count += order_amt ;
            countsMap.put(order_date+"_"+area_id, count) ;
            System.err.println("areaAmtBolt:"+order_date+"_"+area_id+"="+count);
            collector.emit(new Values(order_date+"_"+area_id,count)) ;
            System.out.println("***********"+order_date+"_"+area_id+count);
        }
        
    }
    
    public Map<String, Double> initMap(String rowKeyDate, HBaseDAO dao)
    {
        Map <String,Double> countsMap = new HashMap<String, Double>() ;
        List<Result> list = dao.getRows("area_order", rowKeyDate, new String[]{"order_amt"});
        for(Result rsResult : list)
        {
            String rowKey = new String(rsResult.getRow());
            for(KeyValue keyValue : rsResult.raw())
            {
                if("order_amt".equals(new String(keyValue.getQualifier())))
                {
                    countsMap.put(rowKey, Double.parseDouble(new String(keyValue.getValue()))) ;
                    break;
                }
            }
        }
        
        return countsMap;
        
    }
    



}




import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import HBaseDAO;
import HBaseDAOImp;
public class AreaRsltBolt implements IBasicBolt
{


    private static final long serialVersionUID = 1L;
    Map <String,Double> countsMap = null ;
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        dao = new HBaseDAOImp() ;
        countsMap = new HashMap<String, Double>() ;
    }

    HBaseDAO dao = null;
    long beginTime = System.currentTimeMillis() ;
    long endTime = 0L ;
    
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String date_areaid = input.getString(0);
        double order_amt = input.getDouble(1) ;
        countsMap.put(date_areaid, order_amt) ;
        endTime = System.currentTimeMillis() ;
        if (endTime - beginTime >= 5 * 1000) {
            for(String key : countsMap.keySet())
            {
                // put into hbase
                //這里把處理結(jié)果保存到hbase中
                dao.insert("area_order", key, "cf", "order_amt", countsMap.get(key)+"") ;
                System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key));
            }
        }
    }

    @Override
    public void cleanup() {
        
    }

}

最后 main方法:

import kafka.productor.KafkaProperties;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import AreaAmtBolt;
import AreaFilterBolt;
import AreaRsltBolt;
import OrderBaseSpout;

public class MYTopology {

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new OrderBaseSpout(KafkaProperties.topic), 5);
        builder.setBolt("filterblot", new AreaFilterBolt() , 5).shuffleGrouping("spout") ;
        builder.setBolt("amtbolt", new AreaAmtBolt() , 2).fieldsGrouping("filterblot", new Fields("area_id")) ;
        builder.setBolt("rsltolt", new AreaRsltBolt(), 1).shuffleGrouping("amtbolt");
        
        
        Config conf = new Config() ;
        conf.setDebug(false);
        if (args.length > 0) {
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        }else {
            //本地測試?。。。。。。。。。。?!
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("mytopology", conf, builder.createTopology());
        }
        
        
    }

}






到這里架構(gòu)基本完成了單獨學kafka、storm、hbase、這些東西不難,如何把他們整合起來,這就是不一樣?。。?!呵呵

我博客里面有講kafka、storm、hbase的基礎(chǔ)內(nèi)容,歡迎看





    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多