Browse Source

更新消耗表同步

15135239248@163.com 2 years ago
parent
commit
0e0866defc

+ 14 - 0
src/main/java/com/qucheng/game/data/oldsystem/common/qcConfig.java

@@ -0,0 +1,14 @@
+package com.qucheng.game.data.oldsystem.common;
+
+public class qcConfig {
+
+    public static final String MYSQL_DRIVER = "com.mysql.cj.jdbc.Driver";
+
+    // MYSQL 大数据平台连接方式
+    // mysql url
+    //url增加autoReconnect=true,否则会有连接超时
+    public static final String MYSQL_URL_BIGDATA = "jdbc:mysql://qc-game-cluster.rwlb.rds.aliyuncs.com:3306/quchen_text?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8";
+    public static final String MYSQL_USERNAME_BIGDATA = "qc";
+    public static final String MYSQL_PASSWORD_BIGDATA = "Qc_1234567";
+
+}

+ 97 - 0
src/main/java/com/qucheng/game/data/oldsystem/ods/OdsDataFilling.java

@@ -0,0 +1,97 @@
+package com.qucheng.game.data.oldsystem.ods;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.qucheng.game.data.oldsystem.serialization.CustomerDeserializationSchema;
+import com.qucheng.game.data.oldsystem.sink.ads_sink;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * ODS层数据填充
+ * 过滤字段及转换字段
+ */
+public class OdsDataFilling {
+
+    public static void main(String[] args) throws Exception {
+        System.setProperty("HADOOP_USER_NAME", "flink");
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+
+        //2.通过FlinkCDC构建SourceFunction并读取数据
+        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
+                .hostname("118.31.103.66")
+                .port(3306)
+                .username("root")
+                .password("Qc_1234567")
+                .databaseList("ods")
+                .tableList("ods.byte_t_ad_data_day")
+                .deserializer(new CustomerDeserializationSchema())
+                //5400 和 6400
+                .startupOptions(StartupOptions.initial())
+                .build();
+
+        DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "ODS_Mysql_Source");
+
+        dataStreamSource.map(new FieldFilterMap()).addSink(new ads_sink());
+
+
+        //4.启动任务
+        env.execute("GAME_ODS_FILLING");
+    }
+
+    public static class FieldFilterMap implements MapFunction<String, JSONObject> {
+        @Override
+        public JSONObject map(String value) throws Exception {
+
+            JSONObject result = new JSONObject();
+            JSONObject data = JSON.parseObject(value);
+
+            String tableName = data.getString("tableName");
+            JSONObject after = data.getJSONObject("after");
+            JSONObject key = data.getJSONObject("key");
+            String afterFinal = "";
+            String keyFinal = "";
+            String tableNameFinal = "";
+
+            if (tableName.equals("byte_t_ad_data_day")) {
+
+                List<String> columns = Arrays.asList("account_id", "day",
+                        "cost", "show", "click", "ctr");
+
+                after.entrySet().removeIf(next -> !columns.contains(next.getKey()));
+
+                key.remove("ad_id");
+
+                keyFinal = key.toString().replaceAll("\"day\":", "\"date\":");
+
+                afterFinal = after.toString().replaceAll("\"day\":", "\"date\":")
+                        .replaceAll("\"show\":", "\"view_count\":")
+                        .replaceAll("\"click\":", "\"valid_click_count\":");
+
+
+                tableNameFinal = "daily_tt";
+            }
+
+            Object afterFinalJson = JSONObject.parse(afterFinal);
+            Object keyFinalJson = JSONObject.parse(keyFinal);
+
+            result.put("after", afterFinalJson);
+            result.put("key", keyFinalJson);
+            result.put("type", data.getString("type"));
+            result.put("tableName", tableNameFinal);
+            result.put("db", "quchen_text");
+
+            return result;
+        }
+    }
+}

+ 156 - 0
src/main/java/com/qucheng/game/data/oldsystem/serialization/CustomerDeserializationSchema.java

@@ -0,0 +1,156 @@
+package com.qucheng.game.data.oldsystem.serialization;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import io.debezium.data.Envelope;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class CustomerDeserializationSchema implements DebeziumDeserializationSchema<String> {
+
+    //格式化后目标数据长下边这样
+
+    /**
+     * {
+     * "db":"",
+     * "tableName":"",
+     * "before":{"id":"1001","name":""...},
+     * "after":{"id":"1001","name":""...},
+     * "op":""
+     * }
+     */
+
+
+    @Override
+    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
+
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+
+        SimpleDateFormat sdfs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+        //创建JSON对象用于封装结果数据
+        JSONObject result = new JSONObject();
+
+        //获取库名&表名
+        String topic = sourceRecord.topic();
+        String[] fields = topic.split("\\.");
+
+        result.put("db", fields[1]);
+        result.put("tableName", fields[2]);
+
+        //获取主键
+        Struct key = (Struct) sourceRecord.key();
+        JSONObject keyJson = new JSONObject();
+        if (key != null) {
+            for (Field field : key.schema().fields()) {
+                String fieldName = field.name();
+                String schemaName = field.schema().name();
+
+                Object fieldValue = key.get(field);
+                String type = field.schema().type().name().toLowerCase();
+                //转换时间戳
+                if ("int32".equals(type) && "io.debezium.time.Date".equals(schemaName)) {
+//                    System.out.println("(int)fieldValue = "+(int)fieldValue);
+                    int day = (int) fieldValue;
+                    Long second = day * 24 * 60 * 60L;
+                    Date date = new Date();
+                    date.setTime(second * 1000);
+                    String dateStr = sdf.format(date);
+                    keyJson.put(fieldName, sdf.format(date).toString());
+                    //转换时间戳
+                } else if ("int64".equals(type) && "io.debezium.time.Timestamp".equals(schemaName)) {
+                    long times = (long) fieldValue;
+                    String dateTime = sdfs.format(times - 8 * 60 * 60 * 1000);
+
+                    keyJson.put(fieldName, dateTime);
+                    //转换BLOB类型数据
+                } else if ("bytes".equals(type) && !"org.apache.kafka.connect.data.Decimal".equals(schemaName)) {
+                    ByteBuffer byteBuffer2 = (ByteBuffer) fieldValue;
+                    String s = StandardCharsets.UTF_8.decode(byteBuffer2).toString();
+                    keyJson.put(fieldName, s);
+                } else {
+                    keyJson.put(fieldName, fieldValue);
+                }
+            }
+        }
+
+        //加入key,用作更新和删除
+        result.put("key", keyJson);
+
+        Struct value = (Struct) sourceRecord.value();
+
+        //获取after数据
+        Struct after = value.getStruct("after");
+        JSONObject afterJson = new JSONObject();
+        if (after != null) {
+            for (Field field : after.schema().fields()) {
+                String fieldName = field.name();
+                String schemaName = field.schema().name();
+
+                Object fieldValue = after.get(field);
+                String type = field.schema().type().name().toLowerCase();
+                //转换时间戳
+                if(fieldValue != null){
+                    if ("int32".equals(type) && "io.debezium.time.Date".equals(schemaName)) {
+                        int day = (int) fieldValue;
+                        Long second = day * 24 * 60 * 60L;
+                        Date date = new Date();
+                        date.setTime(second * 1000);
+                        afterJson.put(fieldName, sdf.format(date).toString());
+                        //转换时间戳
+                    } else if ( "int64".equals(type) && "io.debezium.time.Timestamp".equals(schemaName)) {
+                        long times = (long) fieldValue;
+                        String dateTime = sdfs.format(times - 8 * 60 * 60 * 1000);
+
+                        afterJson.put(fieldName, dateTime);
+                        //转换BLOB类型数据
+                    } else if ("bytes".equals(type) && !"org.apache.kafka.connect.data.Decimal".equals(schemaName)) {
+                        ByteBuffer byteBuffer2 = (ByteBuffer) fieldValue;
+                        String s = StandardCharsets.UTF_8.decode(byteBuffer2).toString();
+                        afterJson.put(fieldName, s);
+                    } else {
+                        afterJson.put(fieldName, fieldValue);
+                    }
+                }else{
+                    afterJson.put(fieldName, fieldValue);
+                }
+
+            }
+        }
+
+        result.put("after", afterJson);
+
+        //获取操作类型
+        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
+
+        String type = operation.toString().toLowerCase();
+        if ("create".equals(type) || "read".equals(type)) {
+            type = "insert";
+        } else if ("delete".equals(type)) {
+            type = "delete";
+        } else if ("update".equals(type)) {
+            type = "update";
+        }
+
+        //加入操作类型
+        result.put("type", type);
+
+        //输出数据
+        collector.collect(result.toJSONString());
+
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return BasicTypeInfo.STRING_TYPE_INFO;
+    }
+}

+ 220 - 0
src/main/java/com/qucheng/game/data/oldsystem/sink/ads_sink.java

@@ -0,0 +1,220 @@
+package com.qucheng.game.data.oldsystem.sink;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import static com.qucheng.game.data.oldsystem.common.qcConfig.*;
+
+public class ads_sink extends RichSinkFunction<JSONObject> {
+
+    //    private DruidDataSource druidDataSource = null;
+    //设置连接信息
+
+    PreparedStatement ps;
+    BasicDataSource dataSource;
+    private Connection connection;
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        dataSource = new BasicDataSource();
+        connection = getConnection(dataSource);
+        connection.setAutoCommit(true);
+    }
+
+    @Override
+    public void invoke(JSONObject values, Context context) throws SQLException {
+        //拿到表名和数据
+        String sinkTable = values.getString("tableName");
+        JSONObject data = values.getJSONObject("after");
+        String database = values.getString("db");
+        JSONObject key = values.getJSONObject("key");
+
+        try {
+            //执行具体操作
+            if (values.getString("type").equals("insert") || values.getString("type").equals("update") ) {
+                upsert(connection, database, sinkTable, data,key.toString());
+            } else if (values.getString("type").equals("delete")) {
+                deleteValues(connection, database, sinkTable, key.toString());
+            } else {
+                System.out.println("该数据不属于插入,删除和更新" + values);
+            }
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+
+    //实现upsert方法。如果无更新则插入
+    public void upsert(Connection connection, String database, String sinkTable, JSONObject data, String key) throws SQLException {
+
+        StringBuffer prefix = new StringBuffer();
+        StringBuffer suffix = new StringBuffer();
+
+        JSONObject dataKey = JSON.parseObject(key);
+
+        for (Map.Entry<String, Object> entry : dataKey.entrySet()) {
+            String colName = entry.getKey();
+            Object colType = entry.getValue();
+
+            if (colType.getClass().getName().equals("java.lang.String")) {
+                suffix.append("`" + colName + "`" + " = ").append("'").append(colType).append("' and ");
+            } else {
+                suffix.append("`" + colName + "`" + " = ").append(colType).append(" and ");
+            }
+        }
+
+
+
+        for (Map.Entry<String, Object> entry : data.entrySet()) {
+            String colName = entry.getKey();
+            Object colType = entry.getValue();
+
+            if (entry.getValue() == null) {
+                prefix.append("`" + colName + "`" + " = ").append(colType).append(" ,");
+            }else if(colType.getClass().getName().equals("java.lang.String")){
+                prefix.append("`" + colName + "`" + " = '").append(colType).append("' ,");
+            }else {
+                prefix.append("`" + colName + "`" + " = '").append(colType).append("' ,");
+            }
+        }
+
+        //减去最后的and
+        String suffixSql = suffix.substring(0, suffix.length() - 4);
+
+        //减去最后的逗号
+        String prefixSql = prefix.substring(0, prefix.length() - 1);
+
+        String sql = "update " + "`"+database+"`" + "." + "`"+sinkTable +"`" + " set " + prefixSql + " where " + suffixSql;
+//        System.out.println(sql);
+
+        //2.预编译SQL
+        ps = connection.prepareStatement(sql);
+
+        //3.执行
+        ps.executeUpdate();
+
+        if (ps.getUpdateCount() == 0) {
+            StringBuffer strSqlValue = new StringBuffer();
+
+            Set<String> columns = data.keySet();
+            Collection<Object> values = data.values();
+
+            for (Object col : values) {
+                if (col == null) {
+                    strSqlValue.append(col).append(",");
+                }else if (col.getClass().getName().equals("java.lang.String")) {
+                    strSqlValue.append("'").append(col).append("',");
+                } else {
+                    strSqlValue.append(col).append(",");
+                }
+            }
+
+            String join = "`" + StringUtils.join(columns, "`,`") + "`";
+
+
+            String insertsql = "insert into " + "`"+database+"`" + "." + "`"+sinkTable +"`"+ "(" +
+                    join+ ") values (" +strSqlValue.substring(0, strSqlValue.length() - 1) + ");";
+
+            ps = connection.prepareStatement(insertsql);
+//            System.out.println(insertsql);
+
+            //3.执行
+            ps.execute();
+        }
+    }
+
+
+    //执行删除操作
+    //delete from glm_test_cdc1.test2 where id = 1 and user_id = '1'
+    public void deleteValues(Connection connection, String database, String sinkTable, String key) throws SQLException {
+
+        JSONObject dataKey = JSON.parseObject(key);
+        StringBuffer strSqlValue = new StringBuffer();
+
+        for (Map.Entry<String, Object> entry : dataKey.entrySet()) {
+            String colName = entry.getKey();
+            Object colType = entry.getValue();
+
+            if (colType.getClass().getName().equals("java.lang.String")) {
+                strSqlValue.append("`" + colName + "`" + " = ").append("'").append(colType).append("' and ");
+            } else {
+                strSqlValue.append("`" + colName + "`" + " = ").append(colType).append(" and ");
+            }
+        }
+
+        String deleteSql = strSqlValue.substring(0, strSqlValue.length() - 4);
+
+        //拼接删除语句
+        String sql = "delete from " + database + "." + sinkTable + " where " + deleteSql;
+
+        //2.预编译SQL
+        ps = connection.prepareStatement(sql);
+
+        //3.执行
+        ps.execute();
+
+//        System.out.println("-------delete-------");
+
+//        System.out.println(sql);
+
+    }
+
+    private static Connection getConnection(BasicDataSource dataSource) {
+        dataSource.setDriverClassName(MYSQL_DRIVER);
+        //注意,替换成自己本地的 mysql 数据库地址和用户名、密码
+        dataSource.setUrl(MYSQL_URL_BIGDATA); //test为数据库名
+        dataSource.setUsername(MYSQL_USERNAME_BIGDATA); //数据库用户名
+        dataSource.setPassword(MYSQL_PASSWORD_BIGDATA); //数据库密码
+        //设置连接池的一些参数
+        // 设置初始化连接池时池中连接的数量
+        dataSource.setInitialSize(10);
+        dataSource.setMaxTotal(20);
+        // 设置空闲时的最小连接数,必须介于 0 和最大连接数之间,默认为 0
+        dataSource.setMinIdle(1);
+        dataSource.setValidationQuery("select 1");
+        dataSource.setTestWhileIdle(true);
+        dataSource.setTestOnBorrow(false);
+        dataSource.setTestOnReturn(false);
+        // 设置空闲连接回收器每隔 30s 运行一次
+        dataSource.setTimeBetweenEvictionRunsMillis(3 * 1000L);
+        // 设置池中连接空闲 30min 被回收,默认值即为 30 min
+        dataSource.setMinEvictableIdleTimeMillis(30 * 60 * 1000L);
+
+        Connection con = null;
+        try {
+            con = dataSource.getConnection();
+            System.out.println("创建连接池:" + con);
+        } catch (Exception e) {
+            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
+        }
+        return con;
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+
+        //关闭连接和释放资源
+        if (connection != null) {
+            connection.close();
+        }
+        if (ps != null) {
+            ps.close();
+        }
+        if (dataSource != null) {
+            dataSource.close();
+        }
+    }
+
+}