Browse Source

fix :测试环境屏蔽kafka_3

bilingfeng 1 year ago
parent
commit
9e44224ab8

+ 1 - 1
game-module/game-module-sdk/src/main/java/com/zanxiang/game/module/sdk/SDKApplication.java

@@ -23,7 +23,7 @@ public class SDKApplication {
 
     public static void main(String[] args) {
         SpringApplication.run(SDKApplication.class, args);
-        System.out.println("赞象SDK服务启动成功 <测试环境屏蔽kafka_2> ( ´・・)ノ(._.`) \n" +
+        System.out.println("赞象SDK服务启动成功 <测试环境屏蔽kafka_3> ( ´・・)ノ(._.`) \n" +
                 " ___________ _   __\n" +
                 "/  ___|  _  \\ | / /\n" +
                 "\\ `--.| | | | |/ / \n" +

+ 80 - 10
game-module/game-module-sdk/src/main/java/com/zanxiang/game/module/sdk/config/KafkaConfig.java

@@ -1,15 +1,24 @@
 package com.zanxiang.game.module.sdk.config;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import java.net.InetAddress;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Future;
 
 /**
  * @author : lingfeng
@@ -23,14 +32,14 @@ public class KafkaConfig {
     @Value("${spring.kafka.game-sdk.bootstrap-servers}")
     private String gameSdkKafkaSevers;
 
-    /**
-     * 服务器域名
-     */
     @Value("${server.domain}")
     private String serverUrl;
 
     @Bean("gameSdkKafkaProducer")
-    public KafkaProducer<String, String> gameKafkaProducer() {
+    public Producer<String, String> gameKafkaProducer() {
+        if (this.serverUrl.contains("test")) {
+            return new TempKafkaProducer<>();
+        }
         String clientId = "UNKNOWN";
         try {
             clientId = InetAddress.getLocalHost().getHostAddress();
@@ -38,13 +47,74 @@ public class KafkaConfig {
             log.error(e.getMessage(), e);
         }
         Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, gameSdkKafkaSevers);
         props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        if (this.serverUrl.contains("test")) {
-            return new KafkaProducer<>(new Properties());
-        }
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, gameSdkKafkaSevers);
         return new KafkaProducer<>(props);
     }
+
+    public static class TempKafkaProducer<K, V> implements Producer<K, V> {
+
+        @Override
+        public void initTransactions() {
+
+        }
+
+        @Override
+        public void beginTransaction() throws ProducerFencedException {
+
+        }
+
+        @Override
+        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String s) throws ProducerFencedException {
+
+        }
+
+        @Override
+        public void commitTransaction() throws ProducerFencedException {
+
+        }
+
+        @Override
+        public void abortTransaction() throws ProducerFencedException {
+
+        }
+
+        @Override
+        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
+            return null;
+        }
+
+        @Override
+        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
+            return null;
+        }
+
+        @Override
+        public void flush() {
+
+        }
+
+        @Override
+        public List<PartitionInfo> partitionsFor(String s) {
+            return null;
+        }
+
+        @Override
+        public Map<MetricName, ? extends Metric> metrics() {
+            return null;
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public void close(Duration duration) {
+
+        }
+    }
+
 }

+ 3 - 6
game-module/game-module-sdk/src/main/java/com/zanxiang/game/module/sdk/service/impl/GameUserRoleServiceImpl.java

@@ -19,7 +19,7 @@ import com.zanxiang.module.redis.service.IDistributedLockComponent;
 import com.zanxiang.module.util.DateUtil;
 import com.zanxiang.module.util.JsonUtil;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
@@ -61,15 +61,12 @@ public class GameUserRoleServiceImpl extends ServiceImpl<GameUserRoleMapper, Gam
     @Value("${spring.kafka.game-sdk.gameRoleActiveTopic}")
     private String gameRoleActiveTopic;
 
-    /**
-     * 服务器域名
-     */
     @Value("${server.domain}")
     private String serverUrl;
 
     @Autowired
     @Qualifier("gameSdkKafkaProducer")
-    private KafkaProducer<String, String> kafkaProducer;
+    private Producer<String, String> kafkaProducer;
 
     @Override
     @Transactional(rollbackFor = Exception.class)
@@ -254,7 +251,7 @@ public class GameUserRoleServiceImpl extends ServiceImpl<GameUserRoleMapper, Gam
         }
         //测试环境不使用kafka
         if (this.serverUrl.contains("test")) {
-            return Boolean.FALSE;
+            return Boolean.TRUE;
         }
         //活跃提交
         Map<String, Object> activeParamMap = new HashMap<>(6);