數據處理選kafka還是RocketMQ?

                        小編:管理員 148閱讀 2022.07.29

                        場景描述:北京有很多電動車,這些車都會定時地向一個服務器發送狀態信息,這些信息可能包括:車的id、發送 時間、車的位置(經緯度)、車的速度、剩余電量等等。有了這些信息我們可以做很多事情,比如:計算車 的軌跡、出租車的運行規律、電量維持時間等等。

                        一、kafka到底在怎樣的應用場景下使用?

                        在類似這樣的場景下,項目開發中的數據量很大,一天上千萬,最初,數據存在HBase,我們想替換掉HBase ,原因如下:

                        1、數據量大了后,HBase運維成本很高

                        2、數據統計一般在Hive中進行,導致數據有一天的延時

                        那么可實行的方案就是:用Kafka兜住熱數據,然后定時以 microbatch 的方式將數據落地到HDFS

                        效果演示

                        回退環境

                        MQ 選型

                        問:RocketMQ 異常優秀。是不是直接選用 RocketMQ?

                        答:RocketMQ 是在 Kafka 的基礎上重寫的,保留了 Kafka durable 機制、集群優勢,犧牲了一些 吞吐量,換取了更好的 數據可靠性。我們這個場景要求的就是吞吐量。

                        Kafka 更適合密集的數據,RocketMQ適合稀疏的數據:

                        結論:

                        業務場景:用RocketMQ

                        數據場景:1、一般用 Kafka,2個例外:

                        》若有大量小 Topic,用 RocketMQ

                        》若對數據可靠性要求極高,用 RocketMQ

                        二、Kafka 基礎1 Topic

                        Kafka對數據進行劃分唯一的邏輯單元

                        2 、架構速覽

                        問:這樣的架構,能否保證 Topic 中數據的順序?

                        三、Kafka集群搭建

                        要進行這樣一個方案,我們首先需要一個Kafka集群,畢竟巧婦難為無米之炊

                        現在就帶著搭建一個生產級別的Kafka

                        今天帶著大家全手動搭建集群,這樣可以對集群原理有更好的認識

                        1、 安裝JDK8

                        JDK自行解決

                        2、 ZK 安裝

                        Kafka的元數據全部放在ZK上,Kafka強依賴ZK,所以PROD上轉kafka,要先裝ZK

                        1 2 3 4 5 6 7 8 9

                        #統一各機器的時鐘 date -s 'Fri Nov 1 11:17:46 CST 2019' #上傳安裝包 #解壓縮 tar -zxvf kafka_2.11-2.2.1.tgz tar -zxvf zookeeper-3.4.13.tar.gz #創建數據目錄 mkdir -p data/zookeeper/ mkdir -p data/kafka

                        cp zoo_sample.cfg zoo.cfg

                        vi zoo.cfg

                        1 2 3 4 5 6 7 8 9 10 11 12 13 14

                        # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 syncLimit=5 # example sakes. dataDir=/home/zk/data/zookeeper #change # the port at which the clients will connect clientPort=2181 server.1=192.168.90.131:8880:7770 #add server.2=192.168.90.132:8880:7770 #add server.3=192.168.90.133:8880:7770 #add

                        1 2 3 4 5

                        #創建日志目錄 mkdir -p /home/zk/zookeeper-3.4.13/logs #指定日志目 vi zkEnv.sh 添加如下行: ZOO_LOG_DIR=/home/zk/zookeeper-3.4.13/logs

                        1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18

                        #分發 安裝包 cd /home/zk/ scp -r zookeeper-3.4.13 192.168.90.132:`pwd` scp -r zookeeper-3.4.13 192.168.90.133:`pwd` #每臺機器配置 myid cd /home/zk/data/zookeeper/ echo "1" > myid #在第1臺機器執行 echo "2" > myid #在第2臺機器執行 echo "3" > myid #在第3臺機器執行 #啟動ZK,每臺機器執行: cd /home/zk/zookeeper-3.4.13 bin/zkServer.sh start #檢查集群狀態 bin/zkServer.sh status 集群狀態為 leader 或 follower,則集群正常

                        3、Kafka 安裝

                        1 2 3

                        #分發kafka安裝包 scp -r kafka_2.11-2.2.1 192.168.90.132:`pwd` scp -r kafka_2.11-2.2.1 192.168.90.133:`pwd

                        修改 每臺機器,confifig/server.properties

                        1 2 3 4

                        broker.id=0 其他機器改為為1、2 log.dir=/home/zk/data/kafka listeners=PLAINTEXT://zkserver1:9092 zkserver1改為其他機器相應的 hostname

                        1 2

                        啟動kafka,每臺機器執行: bin/kafka-server-start.sh config/server.properties &

                        5、測試Kafka

                        1 2 3 4 5 6

                        #創建topic bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 2 #生產 bin/kafka-console-producer.sh --broker-list 192.168.90.131:9092 --topic test #消費 bin/kafka-console-consumer.sh --bootstrap-server 192.168.90.131:9092 --topic test

                        四、producer端1、 創建項目

                        創建項目,指定 compiler

                        1 2 3 4

                        <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties>

                        2、確定數據結構
                        import java.sql.Date;
                        
                        public class Electrocar {
                            private String id;
                        
                            //數據發送時間
                            private Date time;
                        
                            //經度
                            private double longitude;
                            private double latitude;
                        
                            //速度
                            private double speed;
                        
                            //剩余電量
                            private double dump_energy;
                        
                        
                            //構造函數,用于快速構造數據
                            public Electrocar(String id,
                                              Date time,
                                              double longitude,
                                              double latitude,
                                              double speed,
                                              double dump_energy){
                                this.id = id;
                                this.time = time;
                                this.longitude = longitude;
                                this.speed = speed;
                                this.dump_energy = dump_energy;
                            }
                        
                        
                            //生成getter方法,不生成setter方法
                        
                            public String getId() {
                                return id;
                            }
                        
                            public Date getTime() {
                                return time;
                            }
                        
                            public double getLongitude() {
                                return longitude;
                            }
                        
                            public double getLatitude() {
                                return latitude;
                            }
                        
                            public double getSpeed() {
                                return speed;
                            }
                        
                            public double getDump_energy() {
                                return dump_energy;
                            }
                        
                        }
                        復制2、生成數據

                        1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29

                        public class CarDataSource { public static void main(String args[]) throws InterruptedException { while (true){ ElectroCar car = nextRecord(); //生成數據 System.out.println(String.format("%s|%f|%f", car.getId(), car.getLatitude(), car.getLongitude())); Thread.sleep(200); } } public static ElectroCar nextRecord(){ //定義random,用于生成隨機值 Random random = new Random(); //構建 ElectroCar對象 ElectroCar car = new ElectroCar( random.nextInt(10) + "", new Date(System.currentTimeMillis()), random.nextFloat(), random.nextFloat(), random.nextFloat(), random.nextFloat() ); return car; } }

                        3、producer 官網示例

                        1 2 3 4 5

                        <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.0</version> </dependency>

                        1 2 3 4 5 6 7 8 9 10 11

                        Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();

                        4 創建topic

                        1 2 3 4 5

                        bin/kafka-topics.sh --create --bootstrap-server 192.168.90.131:9092 --replication-factor 3 --partitions 3 --topic electrocar

                        5 數據格式

                        思考:應該以什么格式將數據 publish 到 Kafka? json不好, 要用二進制

                        ObjectBinary測試

                        1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44

                        public class ObjectBinaryUtil { public static void main(String args[]){ Electrocar car = CarDataSource.nextRecord(); byte[] arr = null; //將Car obj output 為byte[] //ByteArray輸出 ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { //將oos輸出到bos ObjectOutputStream oos = new ObjectOutputStream(bos); //對象輸出到oos oos.writeObject(car); //獲取byte[] arr = bos.toByteArray(); System.out.println("arr.length :" + arr.length); } catch (IOException e) { e.printStackTrace(); } //將byte[] 轉成 obj //接受arr輸入 ByteArrayInputStream bis = new ByteArrayInputStream(arr); try { //bis 轉為ObjectInput ObjectInputStream ois = new ObjectInputStream(bis); //從ObjectInput 讀取Obj Electrocar car1 = (Electrocar) ois.readObject(); System.out.println("++++" + car.getLatitude()); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } }

                        ObjectBinearyUtil 封裝

                        1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73

                        //Object to byte[] public static byte[] toBinary(Object obj){ //將Car obj output 為byte[] //ByteArray輸出 ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = null; try { //將oos輸出到bos oos = new ObjectOutputStream(bos); //對象輸出到oos oos.writeObject(obj); //獲取byte[] return bos.toByteArray(); } catch (IOException e) { e.printStackTrace(); }finally { if (bos !=null){ try { bos.close(); } catch (IOException e) { e.printStackTrace(); } } if (oos !=null){ try { oos.close(); } catch (IOException e) { e.printStackTrace(); } } } return null; } //byte[] to Object public static Object toObject(byte[] arr){ //將byte[] 轉成 obj //接受arr輸入 ByteArrayInputStream bis = new ByteArrayInputStream(arr); ObjectInputStream ois = null; try { //bis 轉為ObjectInput ois = new ObjectInputStream(bis); //從ObjectInput 讀取Obj return ois.readObject(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); }finally { if (bis!=null){ try { bis.close(); } catch (IOException e) { e.printStackTrace(); } } if (ois !=null){ if (ois !=null){ try { ois.close(); } catch (IOException e) { e.printStackTrace(); } } } } return null; }

                        6、消息順序

                        思考:消息的順序丟失了,怎么辦? 將相同id的數據放到同一個partition

                        1 2 3 4 5 6 7 8 9 10 11 12 13

                        while (true){ Electrocar car = nextRecord(); byte[] carBinary = ObjectBinaryUtil.toBinary(car); ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>( "electrocar", car.getId(), //通過傳入carId,來保證消息的順序 carBinary); producer.send(record); Thread.sleep(200); System.out.println("published..."); }

                        五、consumer 傳統方式

                        group.id

                        Kafka 中有一個消費者集群的概念,我們將其稱之為consumer group。

                        auto.commit

                        1、問:consumer 重啟時,應該從何處開始繼續消費?

                        答:從關閉時的 offset開始消費,這就要 實時記錄消費進度

                        2、enable.auto.commit=true時,由 consumer 自動提交,false時手動提交

                        1

                        consumer.commitAsync(); //手動提交API

                        3、問: offset 提交到哪里了呢?

                        答:在 offset早期,提交到ZK,提交到系統級別的topic

                        4、存在數據數據一致性問題

                        能夠理解的同學扣個1,不理解的扣個2

                        exactly-once 方案方案總述消費kafka
                        //創建 demo2
                        
                        //實例化consumer從demo1處拷貝
                        
                        //修改數據類型 
                        KafkaConsumer<String, byte[]> consumer
                        ByteArrayDeserializer
                        
                        //沒有 commit offset,不能用subscribe 方法
                                List<TopicPartition> partitions = new ArrayList<>();
                                for (int i=0; i<3; i++){
                                    //構建partition 對象
                                    TopicPartition p = new TopicPartition(topic, i);
                                    partitions.add(p);
                                }
                        
                                //指定,當前consuer具體消費哪幾個paritions
                                consumer.assign(partitions);
                        復制seek到具體Offset

                        重啟consumer時,要從MySQL中獲取offset,

                        根據該offset開始消費 toipic,

                        就要知道如何跳轉到 具體的 offset

                        for (TopicPartition p : partitions){
                                    consumer.seek(p, 20);       //將partition seek到具體的offset開始消費
                                }
                        復制建MySQL表

                        1 2 3 4 5 6 7 8 9 10 11

                        CREATE TABLE `electrocar` ( `topic` varchar(20) DEFAULT NULL, `pid` int(11) DEFAULT NULL, `offset` mediumtext, `id` int(11) DEFAULT NULL, `timestamp` date DEFAULT NULL, `longitude` float DEFAULT NULL, `latitude` float DEFAULT NULL, `speed` float DEFAULT NULL, `dump_energy` float DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8

                        落地數據

                        1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

                        //引入JdbcHelper #創建連接 JdbcHelper jdbcHelper = new JdbcHelper("jdbc:mysql://192.168.90.131:3306/kafka", "kafka", "kafka"); Connection conn = jdbcHelper.getConnection(); System.out.println("MySQL conn inited..."); Statement stat = null; //創建會話 try { stat = conn.createStatement(); while (true) { //循環執行poll方法 //到服務端拉取消息,得到一個集合 ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100)); if (records.count() >0){ //有消息,才insert //將records 轉成 批量插入的SQL語句 String sql = records2SQL(records); stat.execute(sql); System.out.println("inserted..."); }else { System.out.println("no record..."); } } } catch (SQLException e) { e.printStackTrace(); }

                        records轉SQL
                        public static String records2SQL(ConsumerRecords<String, String> records){
                                StringBuilder sb = new StringBuilder();
                        
                                sb.append("INSERT INTO kafka.electrocar VALUES ");
                        
                                Iterator itr = records.iterator();
                        
                                while (itr.hasNext()){
                                    ConsumerRecord<String, byte[]> record = (ConsumerRecord<String, byte[]>)itr.next();
                                    Electrocar car = (Electrocar) ObjectBinaryUtil.toObject(record.value());
                        
                                    String strDateFormat = "yyyy-MM-dd HH:mm:ss";
                                    SimpleDateFormat sdf = new SimpleDateFormat(strDateFormat);
                                    String time = sdf.format(car.getTime());
                        
                                    String sqlPiece = String.format("('%s',%d,%d,%s,'%s',%f,%f,%f,%f)",
                                            record.topic(),
                                            record.partition(),
                                            record.offset(),
                                            car.getId(),
                                            time,
                                            car.getLongitude(),
                                            car.getLatitude(),
                                            car.getSpeed(),
                                            car.getDump_energy());
                        
                                    sb.append(sqlPiece);
                        
                                    if (itr.hasNext()){
                                        sb.append(",");
                                    }
                        
                                }
                        
                                //System.out.println(sb.toString());
                                return sb.toString();
                            }
                        復制
                        <dependency>
                              <groupId>mysql</groupId>
                              <artifactId>mysql-connector-java</artifactId>
                              <version>5.1.25</version>
                            </dependency>
                        
                            import com.mysql.jdbc.Driver;
                        復制封裝成通用工具

                        1、創建 ExactOnceConsumer

                        現在還只是一個demo,只能用于electrocar topic的消費,現在我們將其封裝成一個小框架,讓他能夠經過極少量的開發,就能消費其他的topic

                        2、重構

                        關聯標簽:
                        少妇各种各样BBBⅩXX