实时数仓之数据采集

日志数据采集

模拟日志生成器使用

对于日志数据,我们采用写好的一个模拟生成日志数据的jar包,运行jar包可以将日志发送给服务器上某一个指定的端口,需要我们大数据程序员从指定端口接收数据并对数据进行处理。

前端埋点数据通过nginx做负载均衡转发给日志服务器。这里我们首先通过编写soringboot代码模拟后台在本地做测试,之后再将项目打成jar包上传服务器测试,之后再搭建nginx测试测试负载均衡。

模拟web app前端埋点生成数据

1
2
3
4
5
6
7
(base) [dw@hadoop116 module]$ mkdir gmall-flink
(base) [dw@hadoop116 module]$ cd gmall-flink/
(base) [dw@hadoop116 gmall-flink]$ ls
(base) [dw@hadoop116 gmall-flink]$ mkdir rt_applog
# 上传数据
(base) [dw@hadoop116 rt_applog]$ ls
application.yml gmall2020-mock-log-2020-12-18.jar

测试jar包:

这里要注意,8080端口未监听,如果你启动zk,zk默认是8080端口,虽然这样有监听,但是会报错404,无响应。

1
2
3
(base) [dw@hadoop116 rt_applog]$ java -jar gmall2020-mock-log-2020-12-18.jar

java.net.ConnectException: Failed to connect to localhost/0:0:0:0:0:0:0:1%1:8080

日志采集模块-本地测试

SpringBoot简介

(大数据程序员,要会写数据接口。给你一个请求,我要什么东西,一个json格式,去访问数据库,最后把数据加工成要的json格式返回。)

SpringBoot 是由 Pivotal 团队提供的全新框架,其设计目的是用来简化新 Spring应用的初始搭建以及开发过程。 该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置。

SpringBoot分为Controller、Service、DAO(如果用Mybaits就称为MApper)、持久化层(DAO工艺Mapper去读取写入数据的地方)。

  • controller:拦截用户的请求,调用service,响应请求 (相当于服务员)
  • service:调用DIO层,加工数据 (相当于厨房)
  • DAO:获取数据 (相当于采购)
  • 持久化层:存储数据 (相当于菜市场)

分层复用,模块化、结构化开发,便于维护。

搭建springboot

1.创建一个新项目:

2.创建一个模块

这里由于该插件只有在当前安装了插件的idea里会自动帮忙生成get、set方法,所以不适合代码协作。这里要注意,除了安装lombok的依赖项,还要到插件plugins里安装lombok插件。

编写测试代码

在SpringBoot三层里,必不可少的是controller,我们编写controller的代码进行测试。(可以将services和DAO的代码写到controller,不过一般不这么写)

创建一个controller包:

编写一个loggercontroller:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.everweekup.gmalllogger.controller;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

/**
* @author jabari
* @date 2021年11月08日 15:54
*/

@Controller
public class LoggerController {

// 添加一个映射叫test, 请求要和映射test保持一致
@RequestMapping("test")
public String test1(){
System.out.println("success");
return "success";
}

}

启动程序测试:

访问页面:

修改返回success.html,创建一个网页,重启程序访问:

访问成功:

如果只想让其返回java对象,有两种方式,第一种:

加了这个注释后,下面所有方法返回的对象都是普通的java对象。

第二种方式:

继续编写一个请求传入参数的方法:

1
2
3
4
5
6
7
8
9
@RequestMapping("test2")
// @RequestParam("name") String nn URL请求参数name传入,用nn接收该参数
// @RequestParam(value = "age", defaultValue = "18") int age 由默认值的参数可以不传
@ResponseBody
public String test2(@RequestParam("name") String nn,
@RequestParam(value = "age", defaultValue = "18") int age){
System.out.println("success");
return nn + ":" + age + "success";
}

请求里传入默认值:

开始编写接收模拟请求代码

先查看请求地址配置文件:

1
2
3
4
5
(base) [dw@hadoop116 rt_applog]$ pwd
/opt/module/gmall-flink/rt_applog
(base) [dw@hadoop116 rt_applog]$ ls
application.yml gmall2020-mock-log-2020-12-18.jar logs
(base) [dw@hadoop116 rt_applog]$ vim application.yml

开始编写方法:

1
2
3
4
5
6
7
8
9
10
11
12
@RequestMapping("applog")
public String getLog(@RequestParam("param") String jsonStr){

// 测试数据流是否成功 打印数据
System.out.println(jsonStr);

// 将数据落盘

// 将数据写入Kafka

return "success";
}

启动程序,修改请求发生配置文件的ip地址为本机地址:

虚拟机生成数据:

1
(base) [dw@hadoop116 rt_applog]$ java -jar gmall2020-mock-log-2020-12-18.jar

idea终端输出:

虚拟机终端输出:

开始编写落盘代码

使用日志打印的方式,将数据写入到磁盘。

1.修改springboot配置

2.配置日志打印所需的xml配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 定义了一个变量-->
<property name="LOG_HOME" value="g:/Desktop/obsidian_aliyun_sync/code_repo/logs" />
<!-- 打印到控制台-->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<!-- 每输出一条消息换行-->
<pattern>%msg%n</pattern>
</encoder>
</appender>
<!-- 打印日志,通过滚动日志的形式写入文件-->
<appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 数据输出到此路径下-->
<file>${LOG_HOME}/app.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>
<!-- 将某一个包下日志单独打印 -->
<!-- 注意这里的LoggerController是日志打印的目标类,只打印该类下的日志数据-->
<logger name="com.everweekup.gmall.controller.LoggerController" level="INFO" additivity="false">
<!-- 引用日志打印方式-->
<appender-ref ref="rollingFile" />
<appender-ref ref="console" />
</logger>

<root level="error" additivity="false">
<appender-ref ref="console" />
</root>
</configuration>

3.给代码添加log对象

注意这里引入的log对象是lombok包下的

4.编写log打印和kafka对接代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.everweekup.gmall.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.protocol.types.Field;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

/**
* @author jabari
* @date 2021年11月08日 16:48
*/

//@Controller
@Slf4j
@RestController // 使得返回值是一个普通的java对象
public class LoggerController {

// 通过该注解,使其自动构建kafkaTemplate对象
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

// 添加一个映射叫test, 请求要和映射test保持一致
@RequestMapping("test")
// @ResponseBody
public String test1(){
System.out.println("success");
return "success.html";
}

@RequestMapping("test2")
// @RequestParam("name") String nn URL请求参数name传入,用nn接收该参数
// @RequestParam(value = "age", defaultValue = "18") int age 由默认值的参数可以不传
// @ResponseBody
public String test2(@RequestParam("name") String nn,
@RequestParam(value = "age", defaultValue = "18") int age){
System.out.println("success");
return nn + ":" + age + "success";
}

@RequestMapping("applog")
public String getLog(@RequestParam("param") String jsonStr){

// 测试数据流是否成功 打印数据
// System.out.println(jsonStr); 后面的log模块配置了日志打印到控制台所以这里就不需要了

// 将数据落盘
// 用info写入的数据就是info数据
// log.debug();
log.info(jsonStr);
// log.warn();
// log.error();
// log.trace(); 级别更高,更严重

// 将数据写入Kafka 主题, 数据
kafkaTemplate.send("ods_base_log", jsonStr);

return "success";
}
}

开始测试

1.启动kafka

先启动zk,再启动kafka集群:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
(base) [dw@hadoop116 module]$ zkS.sh start
---------- zookeeper hadoop116 启动 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cf g
Starting zookeeper ... STARTED
---------- zookeeper hadoop117 启动 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cf g
Starting zookeeper ... STARTED
---------- zookeeper hadoop118 启动 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cf g
Starting zookeeper ... STARTED

(base) [dw@hadoop116 kafka]$ pwd
/opt/module/kafka
(base) [dw@hadoop116 kafka]$ kafka.sh start
--------启动 hadoop116 Kafka-------
--------启动 hadoop117 Kafka-------
--------启动 hadoop118 Kafka-------

2.创建主题

主题没创建直接消费该主题会报一个警告,但是可以往该主题发,默认配置是会自动创建主题,如果没创建的话。

1
2
# 创建一个副本数1,分区数10的名为ods_base_log的topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 10 --topic ods_base_log

这里我们不创建topic,直接启动kafka consumer,尝试运行下:

1
(base) [dw@hadoop116 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic ods_base_log

3.启动服务端

4.启动jar包

1
(base) [dw@hadoop116 rt_applog]$ java -jar gmall2020-mock-log-2020-12-18.jar

日志生成页面:

本地idea终端:

查看kafka消费者:

数据都在滚动生成。

检查时发现,kafka中没找到ods_base_log这个主题,那么数据生成保存到哪里去了?

模拟日志采集模块-集群测试

开始打包,单机部署

修改路径,因为要在linux上使用。

开始打包:

1
2
3
4
(base) [dw@hadoop116 rt_applog]$ mv gmall2021-logger-0.0.1-SNAPSHOT.jar gmall-logger.jar
(base) [dw@hadoop116 rt_applog]$
(base) [dw@hadoop116 rt_applog]$ ls
application.yml gmall2020-mock-log-2020-12-18.jar gmall-logger.jar logs
集群测试

上传到集群后,修改日志生成代码的目标IP地址

1.先启动web服务

1
2
3
4
5
6
7
8
9
10
11
12
(base) [dw@hadoop116 rt_applog]$ ls
application.yml gmall2020-mock-log-2020-12-18.jar gmall-logger.jar logs
(base) [dw@hadoop116 rt_applog]$ java -jar gmall-logger.jar

. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.3.7.RELEASE)

2.启动kafka消费者

1
(base) [dw@hadoop116 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic ods_base_log

3.启动日志生成jar包

1
(base) [dw@hadoop116 rt_applog]$ java -jar gmall2020-mock-log-2020-12-18.jar

4.查看执行结果

Nginx搭建与负载均衡测试

该部分内容查看链接:Nginx搭建与负载均衡测试

*业务数据库数据采集

架构和FlinkCDC是这整个实时数仓的重点内容。业务数据采集中我们会用到FlinkCDC根据业务数据库的binlog来捕获变动数据,此外,我们还会对比FlinkCDC、Canal、Maxwell这三种CDC工具。

MYSQL数据准备

创建实时数据库:

导入建表数据:

开启binlog

1
2
3
(base) [dw@hadoop116 logs]$ sudo vim /etc/my.cnf
# 修改后重启
(base) [dw@hadoop116 logs]$ sudo systemctl restart mysqld

检验binlog生效

开启后进入到binlog所在目录下查看:

我们随便插入一条查看binlog有无变化:

查看下binlog变化:

发现发生变化,binlog已经准备好了。

业务数据抓取—FlinkCDC

请参阅此链接:FlinkCDC

业务数据抓取—Canal&Maxwell

CDC-Canal&Maxwell

FlinkCDC VS CanalCDC VS MaxwellCDC

FlinkCDC VS CanalCDC VS MaxwellCDC