liubing 7 tháng trước cách đây
commit
75493ea91b

+ 3 - 0
.gitignore

@@ -0,0 +1,3 @@
+target
+.idea
+.vscode

+ 8 - 0
Dockerfile

@@ -0,0 +1,8 @@
+FROM flink:1.19.1
+ENV KAFKA_BROKERS kafka.observe.svc.cluster.local:9092
+ENV MYSQL_DSN jdbc:mysql://mysql.observe.svc.cluster.local:3306/observe
+
+RUN mkdir -p $FLINK_HOME/usrlib
+RUN mkdir -p /conf
+
+COPY ./target/ob-agent-stream-1.0-SNAPSHOT.jar $FLINK_HOME/usrlib/ob-agent-stream.jar

+ 11 - 0
Dockerfile.js.prod

@@ -0,0 +1,11 @@
+FROM flink:1.19.1
+ENV KAFKA_BROKERS 21.47.83.201:40044
+ENV MYSQL_DSN jdbc:mysql://mysql.observe.svc.cluster.local:3306/observe
+ENV CH_WRITE_QUEUE_SIZE 10
+ENV CH_WRITER_NUM 5
+ENV CH_WRITE_BUF_SIZE 100000000
+
+RUN mkdir -p $FLINK_HOME/usrlib
+RUN mkdir -p /conf
+
+COPY ./target/ob-agent-stream-1.0-SNAPSHOT.jar $FLINK_HOME/usrlib/ob-agent-stream.jar

+ 12 - 0
Dockerfile.js.test

@@ -0,0 +1,12 @@
+FROM flink:1.19.1
+ENV KAFKA_BROKERS kafka.observe.svc.cluster.local:9092
+ENV MYSQL_DSN jdbc:mysql://mysql.observe.svc.cluster.local:3306/observe
+
+ENV CH_WRITE_QUEUE_SIZE 10
+ENV CH_WRITER_NUM 5
+ENV CH_WRITE_BUF_SIZE 100000
+
+RUN mkdir -p $FLINK_HOME/usrlib
+RUN mkdir -p /conf
+
+COPY ./target/ob-agent-stream-1.0-SNAPSHOT.jar $FLINK_HOME/usrlib/ob-agent-stream.jar

+ 37 - 0
Makefile

@@ -0,0 +1,37 @@
+.PHONY: version
+
+version?=1.0.0
+tag=reg.cestong.com.cn/cecf/ob-agent-stream:${version}
+
+
+jar:
+	mvn package
+
+docker-build: jar
+	docker build . -t ${tag}
+
+docker-push: docker-build
+	docker push ${tag}
+	echo ${version} > ./version
+
+# test
+js_test_tag=hub.js.sgcc.com.cn/observe/ob-agent-stream-js-test:${version}
+js_test_save_path=~/tmp/oas-js-test-${version}.tar
+js-test-docker-build: jar
+	docker build . -t ${js_test_tag} -f Dockerfile.js.test
+	docker save ${js_test_tag} -o ${js_test_save_path}
+
+js-test-docker-push: js-test-docker-build
+	docker push ${js_test_tag}
+	echo ${version} > ./version
+
+
+# js prod
+js_prod_tag=hub.js.sgcc.com.cn/observe/ob-agent-stream-js-prod:${version}
+js-prod-docker-build: jar
+	docker build . -t ${js_prod_tag} -f Dockerfile.js.prod
+
+js-prod-docker-push: js-prod-docker-build
+	docker push ${js_prod_tag}
+	echo ${version} > ./version
+

+ 244 - 0
pom.xml

@@ -0,0 +1,244 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.cecf.observe</groupId>
+    <artifactId>ob-agent-stream</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <name>agent采集数据的处理程序</name>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <flink.version>1.19.1</flink.version>
+        <target.java.version>11</target.java.version>
+        <maven.compiler.source>${target.java.version}</maven.compiler.source>
+        <maven.compiler.target>${target.java.version}</maven.compiler.target>
+        <log4j.version>2.17.1</log4j.version>
+    </properties>
+
+    <repositories>
+        <repository>
+            <id>apache.snapshots</id>
+            <name>Apache Development Snapshot Repository</name>
+            <url>https://repository.apache.org/content/repositories/snapshots/</url>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <!-- Apache Flink dependencies -->
+        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.opencsv</groupId>
+            <artifactId>opencsv</artifactId>
+            <version>5.7.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-kafka</artifactId>
+            <version>3.2.0-1.19</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>8.0.33</version>
+        </dependency>
+
+        <!-- Add connector dependencies here. They must be in the default scope (compile). -->
+
+        <!-- Example:
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+		-->
+
+        <!-- Add logging framework, to produce console output when running in the IDE. -->
+        <!-- These dependencies are excluded from the application JAR by default. -->
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <version>${log4j.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${log4j.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.json</groupId>
+            <artifactId>json</artifactId>
+            <version>20230227</version>
+        </dependency>
+        <dependency>
+            <groupId>ru.ivi.opensource</groupId>
+            <artifactId>flink-clickhouse-sink</artifactId>
+            <version>1.4.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.jetbrains</groupId>
+            <artifactId>annotations</artifactId>
+            <version>15.0</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <!-- Java Compiler -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>${target.java.version}</source>
+                    <target>${target.java.version}</target>
+                </configuration>
+            </plugin>
+
+            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary
+            dependencies. -->
+            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.1.1</version>
+                <executions>
+                    <!-- Run shade goal on package phase -->
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <createDependencyReducedPom>false</createDependencyReducedPom>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>org.apache.flink:flink-shaded-force-shading</exclude>
+                                    <exclude>com.google.code.findbugs:jsr305</exclude>
+                                    <exclude>org.slf4j:*</exclude>
+                                    <exclude>org.apache.logging.log4j:*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <!-- Do not copy the signatures in the META-INF folder.
+									Otherwise, this might cause SecurityExceptions when using the JAR. -->
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>com.cecf.observe.TraceStream</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+
+        <pluginManagement>
+            <plugins>
+
+                <!-- This improves the out-of-the-box experience in Eclipse by resolving some
+                warnings. -->
+                <plugin>
+                    <groupId>org.eclipse.m2e</groupId>
+                    <artifactId>lifecycle-mapping</artifactId>
+                    <version>1.0.0</version>
+                    <configuration>
+                        <lifecycleMappingMetadata>
+                            <pluginExecutions>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>org.apache.maven.plugins</groupId>
+                                        <artifactId>maven-shade-plugin</artifactId>
+                                        <versionRange>[3.1.1,)</versionRange>
+                                        <goals>
+                                            <goal>shade</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore/>
+                                    </action>
+                                </pluginExecution>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>org.apache.maven.plugins</groupId>
+                                        <artifactId>maven-compiler-plugin</artifactId>
+                                        <versionRange>[3.1,)</versionRange>
+                                        <goals>
+                                            <goal>testCompile</goal>
+                                            <goal>compile</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore/>
+                                    </action>
+                                </pluginExecution>
+                            </pluginExecutions>
+                        </lifecycleMappingMetadata>
+                    </configuration>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+</project>

+ 137 - 0
src/main/java/com/cecf/observe/FillAppNameMapFunc.java

@@ -0,0 +1,137 @@
+package com.cecf.observe;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+public class FillAppNameMapFunc extends RichMapFunction<SpringBootSpan, SpringBootSpan> {
+    private Thread dictUpdateThread;
+
+    private Connection mysqlConn;
+
+    private final String mysqlDSN;
+    private final String mysqlPwd;
+    private final String mysqlUserName;
+
+    public FillAppNameMapFunc(String dsn, String u, String pwd) {
+        mysqlDSN = dsn;
+        mysqlUserName = u;
+        mysqlPwd = pwd;
+    }
+
+    public Connection connectMySQL() throws SQLException {
+
+        try {
+            return DriverManager.getConnection(mysqlDSN, mysqlUserName, mysqlPwd);
+        } catch (SQLException e) {
+            logger.error("连接mysql数据库失败:{}:{}@{}", mysqlUserName, mysqlPwd, mysqlDSN, e);
+            throw e;
+        }
+    }
+
+    private final ReentrantReadWriteLock mapLock = new ReentrantReadWriteLock();
+
+    private Map<String, String> service2AppName = new HashMap<>();
+    private boolean continueUpdateDict = true;
+    public static final Logger logger = LoggerFactory.getLogger(FillAppNameMapFunc.class);
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        mysqlConn = connectMySQL();
+
+        updateDictionary();
+        dictUpdateThread = new Thread(() -> {
+            while (continueUpdateDict) {
+                try {
+                    TimeUnit.SECONDS.sleep(600);
+                } catch (InterruptedException e) {
+                    logger.warn("sleep interrupted:", e);
+                    continue;
+                }
+                updateDictionary();
+            }
+            logger.info("serviceName2AppName dict update thread quitting");
+        });
+        dictUpdateThread.start();
+    }
+
+    @Override
+    public void close() throws Exception {
+        continueUpdateDict = false;
+        logger.info("stop begin, wait update thread");
+        dictUpdateThread.join();
+        logger.info("stop done");
+    }
+
+    @Override
+    public SpringBootSpan map(SpringBootSpan value) {
+        if ((value.AppName == null || value.AppName.isEmpty() || value.AppName.equals("UNSET"))) {
+            mapLock.readLock().lock();
+            value.AppName = service2AppName.getOrDefault(value.ServiceName, "UNSET");
+            mapLock.readLock().unlock();
+        }
+        return value;
+    }
+
+    private boolean areHashMapsEqual(Map<String, String> map1, Map<String, String> map2) {
+        if (map1 == null) {
+            return map2 == null;
+        }
+        if (map2 == null) {
+            return false;
+        }
+        if (map1.size() != map2.size()) {
+            return false;
+        }
+
+        for (String key : map1.keySet()) {
+            if (!map2.containsKey(key)) {
+                return false;
+            }
+
+            String value1 = map1.get(key);
+            String value2 = map2.get(key);
+
+            if (!value1.equals(value2)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    private void updateDictionary() {
+        Map<String, String> newService2AppName = new HashMap<>();
+        String query = "SELECT service_name, app_name FROM observe.data_service_appname where delete_at is null";
+        try {
+            PreparedStatement preparedStatement = mysqlConn.prepareStatement(query);
+            ResultSet resultSet = preparedStatement.executeQuery();
+            while (resultSet.next()) {
+                String serviceName = resultSet.getString("service_name");
+                String appName = resultSet.getString("app_name");
+                newService2AppName.put(serviceName, appName);
+            }
+        } catch (SQLException e) {
+            logger.error("查询service到appName的映射关系出错:", e);
+            return;
+        }
+        if (areHashMapsEqual(newService2AppName, service2AppName)) {
+            logger.info("serviceName2AppName表未改变,跳过更新");
+            return;
+        }
+
+        mapLock.writeLock().lock();
+        service2AppName = newService2AppName;
+        logger.info("serviceName2AppName dict updated, size: {}", service2AppName.size());
+        mapLock.writeLock().unlock();
+    }
+
+}

+ 96 - 0
src/main/java/com/cecf/observe/JSONAnyValueList.java

@@ -0,0 +1,96 @@
+package com.cecf.observe;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class JSONAnyValueList {
+    Logger logger = LoggerFactory.getLogger(JSONAnyValueList.class);
+    Map<String, JSONObject> m;
+
+    public JSONAnyValueList(JSONArray arr) {
+        this.m = new HashMap<>();
+        for (int i = 0; i < arr.length(); i++) {
+            JSONObject o = arr.getJSONObject(i);
+            this.m.put(o.getString("key"), o.getJSONObject("value"));
+        }
+    }
+
+    public String getString(String k) {
+        JSONObject v = this.m.get(k);
+        if (v == null) {
+            return "";
+        }
+        try {
+            return v.getString("stringValue");
+        } catch (JSONException e) {
+            logger.warn("get [{}] from {} failed, not found, [{}]", "stringValue", k, v);
+            return "";
+        }
+    }
+
+    String anyValueToString(JSONObject v) {
+        for (Iterator<String> it = v.keys(); it.hasNext(); ) {
+            String valueKey = it.next();
+            if (valueKey.equals("boolValue")) {
+                return v.getBoolean("boolValue") ? "true" : "false";
+            }
+            if (valueKey.equals("doubleValue")) {
+                return Double.toString(v.getDouble("doubleValue"));
+            }
+            try {
+                return v.getString(valueKey);
+            } catch (JSONException je) {
+                logger.error("{} 不包含String类型的attr值, key:{}", v, valueKey, je);
+                return "";
+            }
+        }
+        return "";
+    }
+
+    public Map<String, String> toStringMap() {
+        Map<String, String> strMap = new HashMap<>();
+        for (Map.Entry<String, JSONObject> en : m.entrySet()) {
+            String key = en.getKey();
+            JSONObject v = en.getValue();
+            //"arrayValue":{"values":[{"stringValue":"1YMWWN1N4O"},{"stringValue":"LS4PSXUNUM"},
+            // {"stringValue":"66VCHSJNUP"},{"stringValue":"HQTGWGPNH4"},{"stringValue":"L9ECAV7KIM"}]}}
+            if (v.has("arrayValue")) {
+                List<String> valueStrList = new ArrayList<>();
+                JSONObject arrayValueValue = v.getJSONObject("arrayValue");
+                JSONArray values = arrayValueValue.getJSONArray("values");
+                for (int i = 0; i < values.length(); i++) {
+                    JSONObject valueItem = values.getJSONObject(i);
+                    valueStrList.add(anyValueToString(valueItem));
+                }
+                strMap.put(key, String.join(";", valueStrList));
+                continue;
+            }
+            strMap.put(key, anyValueToString(v));
+        }
+        return strMap;
+    }
+
+    public int getInt(String k) {
+        JSONObject v = this.m.get(k);
+        if (v == null) {
+            return 0;
+        }
+        try {
+            return v.getInt("intValue");
+        } catch (JSONException e) {
+            try {
+                String si = v.getString("stringValue");
+                return Integer.parseInt(si);
+            } catch (JSONException se) {
+                logger.warn("get [{}] from {} failed, not found, [{}]", "stringValue", k, v);
+                return 0;
+            }
+        }
+    }
+}
+

+ 26 - 0
src/main/java/com/cecf/observe/ServiceNameFilter.java

@@ -0,0 +1,26 @@
+package com.cecf.observe;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+public class ServiceNameFilter implements FilterFunction<SpringBootSpan> {
+
+    public ServiceNameFilter(String[] excludeServiceList) {
+        this.excludeService = new HashSet<>(Arrays.asList(excludeServiceList));
+        this.enabled = !this.excludeService.isEmpty();
+    }
+
+    Set<String> excludeService;
+    boolean enabled;
+
+    @Override
+    public boolean filter(SpringBootSpan springBootSpan) throws Exception {
+        if (!this.enabled) {
+            return true;
+        }
+        return !this.excludeService.contains(springBootSpan.ServiceName);
+    }
+}

+ 16 - 0
src/main/java/com/cecf/observe/SpanEvent.java

@@ -0,0 +1,16 @@
+package com.cecf.observe;
+
+public class SpanEvent {
+    public String name = "";
+    public JSONAnyValueList attrs;
+
+    public TraceException getException() {
+        if (this.attrs == null) {
+            return null;
+        }
+        String exceptionMessage = this.attrs.getString("exception.message");
+        String exceptionStack = this.attrs.getString("exception.stacktrace");
+        String exceptionType = this.attrs.getString("exception.type");
+        return new TraceException(exceptionType, exceptionMessage, exceptionStack);
+    }
+}

+ 33 - 0
src/main/java/com/cecf/observe/SpanEvents.java

@@ -0,0 +1,33 @@
+package com.cecf.observe;
+
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SpanEvents {
+    JSONArray events = new JSONArray();
+
+    public SpanEvents(JSONArray arr) {
+        this.events = arr;
+    }
+
+    public List<SpanEvent> getEventsByName(String name) {
+        List<SpanEvent> ret = new ArrayList<>();
+        for (int i = 0; i < this.events.length(); i++) {
+            JSONObject e = this.events.getJSONObject(i);
+            if (e.has("name") && e.getString("name").equals(name)) {
+                SpanEvent fe = new SpanEvent();
+                fe.name = e.getString("name");
+                if (e.has("attributes")) {
+                    fe.attrs = new JSONAnyValueList(e.getJSONArray("attributes"));
+                }
+                ret.add(fe);
+            }
+        }
+        return ret;
+    }
+}
+
+

+ 15 - 0
src/main/java/com/cecf/observe/SpringBootFlatMapFunc.java

@@ -0,0 +1,15 @@
+package com.cecf.observe;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+public class SpringBootFlatMapFunc implements FlatMapFunction<List<SpringBootSpan>, SpringBootSpan> {
+    @Override
+    public void flatMap(List<SpringBootSpan> springBootSpans, Collector<SpringBootSpan> collector) throws Exception {
+        for (SpringBootSpan span : springBootSpans) {
+            collector.collect(span);
+        }
+    }
+}

+ 64 - 0
src/main/java/com/cecf/observe/SpringBootSpan.java

@@ -0,0 +1,64 @@
+package com.cecf.observe;
+
+import scala.App;
+
+import java.util.*;
+
+public class SpringBootSpan {
+    public SpringBootSpan() {
+        SpanAttributes = new HashMap<>();
+        ResourceAttributes = new HashMap<>();
+        RPCRequest = new HashMap<>();
+        Events = new ArrayList<>();
+        Links = new ArrayList<>();
+        Exceptions = new ArrayList<>();
+        AppName = "UNSET";
+    }
+
+    public boolean hasAppName() {
+        return AppName != null && !AppName.equals("UNSET") && !AppName.isEmpty();
+    }
+
+    public Date Timestamp;
+    public String TraceID;
+    public String SpanID;
+    public String ParentSpanID;
+    public String TraceState;
+    public String SpanName;
+    public int SpanKind;
+    public String ScopeName;
+    public String ScopeVersion;
+    public Map<String, String> SpanAttributes;
+    public String ServiceName;
+    public Map<String, String> ResourceAttributes;
+    public String AppName;
+    public long Duration;
+    public String ContainerID;
+    public String SrcIP;
+    public int SrcPort;
+    public String TargetIP;
+    public int TargetPort;
+
+    public int HttpCode;
+    public String HttpMethod;
+    public String HttpURL;
+    public String RPCType;
+    public String RPCName;
+    public Map<String, String> RPCRequest;
+    public int RPCResult;
+
+    public String FuncNamespace;
+    public String FuncName;
+    public int FuncLineNO;
+    public int FuncResult;
+    public int StatusCode;
+    public String StatusMessage;
+
+    public List<TraceEvent> Events;
+    public List<TraceLink> Links;
+    public String DBStatement;
+    public String DBConnectionString;
+    public List<TraceException> Exceptions;
+}
+
+

+ 171 - 0
src/main/java/com/cecf/observe/SpringBootSpanCSVConverter.java

@@ -0,0 +1,171 @@
+package com.cecf.observe;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ru.ivi.opensource.flinkclickhousesink.ClickHouseSinkConverter;
+
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SpringBootSpanCSVConverter implements ClickHouseSinkConverter<SpringBootSpan> {
+    Logger logger = LoggerFactory.getLogger(SpringBootSpanCSVConverter.class);
+
+    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    @Override
+    public String convert(SpringBootSpan ss) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("(");
+        addString(dateFormat.format(ss.Timestamp), builder);
+        addString(ss.TraceID, builder);
+        addString(ss.SpanID, builder);
+        addString(ss.ParentSpanID, builder);
+        addString(ss.TraceState, builder);
+        addString(ss.SpanName, builder);
+        addInt(ss.SpanKind, builder);
+        addString(ss.ServiceName, builder);
+        addMap(ss.ResourceAttributes, builder);
+        addString(ss.ScopeName, builder);
+        addString(ss.ScopeVersion, builder);
+        addMap(ss.SpanAttributes, builder);
+        addLong(ss.Duration, builder);
+        addInt(ss.StatusCode, builder);
+        addString(ss.StatusMessage, builder);
+
+        builder.append(list2ArrayStr(ss.Events.stream().map(s -> dateFormat.format(s.Timestamp)).collect(Collectors.toList())));
+        builder.append(",");
+        builder.append(list2ArrayStr(ss.Events.stream().map(s -> s.Name).collect(Collectors.toList())));
+        builder.append(",");
+        addMapList(ss.Events.stream().map(s -> s.Attributes).collect(Collectors.toList()), builder);
+
+        builder.append(list2ArrayStr(ss.Links.stream().map(s -> s.TraceID).collect(Collectors.toList())));
+        builder.append(",");
+        builder.append(list2ArrayStr(ss.Links.stream().map(s -> s.SpanID).collect(Collectors.toList())));
+        builder.append(",");
+        builder.append(list2ArrayStr(ss.Links.stream().map(s -> s.TraceState).collect(Collectors.toList())));
+        builder.append(",");
+        addMapList(ss.Links.stream().map(s -> s.Attributes).collect(Collectors.toList()), builder);
+
+        addInt(ss.HttpCode, builder);
+        addString(ss.HttpMethod, builder);
+        addString(ss.HttpURL, builder);
+
+        addString(ss.ContainerID, builder);
+
+        addString(ss.SrcIP, builder);
+        addInt(ss.SrcPort, builder);
+        addString(ss.TargetIP, builder);
+        addInt(ss.TargetPort, builder);
+
+        addString(ss.RPCType, builder);
+        addString(ss.RPCName, builder);
+        addMap(ss.RPCRequest, builder);
+        addInt(ss.RPCResult, builder);
+
+        addString(ss.FuncNamespace, builder);
+        addString(ss.FuncName, builder);
+        addInt(ss.FuncLineNO, builder);
+        addInt(ss.FuncResult, builder);
+
+        addString(ss.DBStatement, builder);
+        addString(ss.DBConnectionString, builder);
+
+
+        builder.append(list2ArrayStr(ss.Exceptions.stream().map(s -> s.Typ).collect(Collectors.toList())));
+        builder.append(",");
+        builder.append(list2ArrayStr(ss.Exceptions.stream().map(s -> s.Message).collect(Collectors.toList())));
+        builder.append(",");
+        builder.append(list2ArrayStr(ss.Exceptions.stream().map(s -> s.StackTrace).collect(Collectors.toList())));
+        builder.append(",");
+
+
+        addString(ss.AppName, builder, true);
+
+        builder.append(")");
+//        logger.info("SQL语句values:{}", builder.toString());
+        return builder.toString();
+    }
+
+    String list2ArrayStr(List<String> ss) {
+        if (ss == null || ss.isEmpty()) {
+            return "[]";
+        }
+        List<String> ess = ss.stream().map(this::escapeStr).collect(Collectors.toList());
+        return "['" + String.join("','", ess) + "']";
+    }
+
+    void addMapList(List<Map<String, String>> ms, StringBuilder sb) {
+        if (ms == null || ms.isEmpty()) {
+            sb.append("[],");
+            return;
+        }
+        sb.append("[");
+        for (Map<String, String> m : ms) {
+            addMap(m, sb);
+        }
+        sb.deleteCharAt(sb.length() - 1);
+        sb.append("],");
+    }
+
+    void addMap(Map<String, String> m, StringBuilder builder) {
+        if (m == null || m.isEmpty()) {
+            builder.append("{},");
+            return;
+        }
+        builder.append("{");
+        for (String key : m.keySet()) {
+            addString(key, builder, true);
+            builder.append(":");
+            addString(m.get(key), builder, true);
+            builder.append(",");
+        }
+        builder.deleteCharAt(builder.length() - 1);
+        builder.append("},");
+    }
+
+
+    void addString(String s, StringBuilder builder) {
+        if (s == null) {
+            builder.append("null,");
+            return;
+        }
+        String es = escapeStr(s);
+        builder.append("'");
+        builder.append(es);
+        builder.append("'");
+        builder.append(",");
+    }
+
+    void addString(String s, StringBuilder builder, boolean end) {
+        addString(s, builder);
+        builder.deleteCharAt(builder.length() - 1);
+    }
+
+    void addInt(int s, StringBuilder builder) {
+        builder.append(String.valueOf(s));
+        builder.append(",");
+    }
+
+    void addInt(int s, StringBuilder builder, boolean end) {
+        builder.append(String.valueOf(s));
+    }
+
+    void addLong(long s, StringBuilder builder) {
+        builder.append(String.valueOf(s));
+        builder.append(",");
+    }
+
+    String escapeStr(String s) {
+        if (s.contains("'")) {
+            return s.replaceAll("'", "''");
+        }
+        return s;
+    }
+
+    void addLong(long s, StringBuilder builder, boolean end) {
+        builder.append(String.valueOf(s));
+    }
+}
+

+ 279 - 0
src/main/java/com/cecf/observe/SpringBootSpanExtractMapFunc.java

@@ -0,0 +1,279 @@
+package com.cecf.observe;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SpringBootSpanExtractMapFunc implements MapFunction<String, List<SpringBootSpan>> {
+    Logger logger = LoggerFactory.getLogger(SpringBootSpanExtractMapFunc.class);
+
+    // 2: SERVER, 3: CLIENT, 1: INTERNAL, 4: PRODUCER, 5: CONSUMER
+    private static final int SpanKindInternal = 1;
+    private static final int SpanKindServer = 2;
+    private static final int SpanKindClient = 3;
+    private static final int SpanKindProducer = 4;
+    private static final int SpanKindConsumer = 5;
+
+    String getRPCType(int spanKind, JSONAnyValueList attributes) {
+        // 2: SERVER, 3: CLIENT, 1: INTERNAL, 4: PRODUCER, 5: CONSUMER
+        // http check http.status_code . io.opentelemetry.apache-httpclient-4.0
+        // rpc check rpc.system. io.opentelemetry.grpc-1.6
+        if (spanKind == 1) {
+            return "local";
+        }
+        if (spanKind == 5) {
+            return "consumer";
+        }
+        if (spanKind == 4) {
+            return "producer";
+        }
+        if (spanKind == 2) {
+            // server
+            return "server";
+        }
+        // client: rpc, http
+        if (!attributes.getString("http.url").isEmpty()) {
+            return "http";
+        }
+        String rpcSys = attributes.getString("rpc.system");
+        if (rpcSys.equals("apache_dubbo")) {
+            return "dubbo";
+        } else if (!rpcSys.isEmpty()) {
+            return rpcSys;
+        }
+        String dbSys = attributes.getString("db.system");
+        if (dbSys.equals("mysql")) {
+            return "mysql";
+        } else if (!dbSys.isEmpty()) {
+            return dbSys;
+        }
+        return "client";
+    }
+
+    String getRPCName(int spanKind, String RPCType, String spanName, JSONAnyValueList attributes) {
+        if (spanKind == 3) {
+            if (RPCType.equals("dubbo")) {
+                return attributes.getString("rpc.method");
+            }
+            if (RPCType.equals("http")) {
+                return attributes.getString("http.target");
+            }
+        }
+        return spanName;
+    }
+
+    int getRPCResult(int spanKind, String RPCType, JSONAnyValueList attributes) {
+        if (spanKind == 3) {
+            if (RPCType.equals("http")) {
+                return attributes.getInt("http.status_code");
+            }
+        }
+        return 0;
+    }
+
+    String getFromJSONObject(JSONObject o, String k, String de) {
+        try {
+            return o.getString(k);
+        } catch (JSONException e) {
+            return de;
+        }
+    }
+
+    String getStringFromJSONObjectWithDefault(JSONObject obj, String key, String defaultValue) {
+        if (!obj.has(key)) {
+            return defaultValue;
+        }
+        try {
+            return obj.getString(key);
+        } catch (JSONException e) {
+            logger.error("get String [{}] from [{}] failed", key, obj);
+            return defaultValue;
+        }
+    }
+
+    int getIntFromJSONObjectWithDefault(JSONObject obj, String key, int defaultValue) {
+        if (!obj.has(key)) {
+            return defaultValue;
+        }
+        try {
+            return obj.getInt(key);
+        } catch (JSONException e) {
+            logger.error("get int [{}] from [{}] failed", key, obj);
+            return defaultValue;
+        }
+    }
+
+    private static final int StatusError = 2;
+    private static final int StatusUnset = 0;
+    private static final int StatusOK = 1;
+    Pattern pattern = Pattern.compile("-DAPP_NAME=(\\S+)");
+
+    SpringBootSpan parseSpan(JSONObject span, String serviceName, String scopeName, String serviceIP, String scopeVersion,
+                             String containerID, Map<String, String> resourceAttrs) {
+        SpringBootSpan clickhouseSpan = new SpringBootSpan();
+        // appName
+        String commandLine = resourceAttrs.get("process.command_line");
+        if (commandLine != null && !commandLine.isEmpty()) {
+            Matcher matcher = pattern.matcher(commandLine);
+            if (matcher.find()) {
+                String appAlias = matcher.group(1);
+                if (appAlias != null && !appAlias.isEmpty()) {
+                    clickhouseSpan.AppName = appAlias;
+                }
+            }
+        }
+        if (!clickhouseSpan.hasAppName()) {
+            String serviceNamespace = resourceAttrs.get("service.namespace");
+            if (serviceNamespace != null && !serviceNamespace.isEmpty() && !serviceNamespace.equals("UNSET")) {
+                clickhouseSpan.AppName = serviceNamespace;
+            } else {
+                String appNameInResource = resourceAttrs.get("app.name");
+                if (appNameInResource != null && !appNameInResource.isEmpty() && !appNameInResource.equals("UNSET")) {
+                    clickhouseSpan.AppName = appNameInResource;
+                }
+            }
+        }
+        // end appName
+        clickhouseSpan.TraceID = span.getString("traceId");
+        clickhouseSpan.ResourceAttributes = resourceAttrs;
+        clickhouseSpan.SpanID = span.getString("spanId");
+        clickhouseSpan.ParentSpanID = span.getString("parentSpanId");
+        clickhouseSpan.SpanKind = span.getInt("kind");
+        clickhouseSpan.ServiceName = serviceName;
+        clickhouseSpan.SpanName = span.getString("name");
+        clickhouseSpan.ScopeName = scopeName;
+        clickhouseSpan.ScopeVersion = scopeVersion;
+        clickhouseSpan.ContainerID = containerID;
+        clickhouseSpan.SrcIP = serviceIP;
+        clickhouseSpan.SrcPort = 0;
+        clickhouseSpan.TraceState = "";
+        clickhouseSpan.RPCRequest = new HashMap<>();
+
+        int statusCode = 0;
+        String statusMessage = "";
+        if (span.has("status")) {
+            JSONObject status = span.getJSONObject("status");
+            if (status.has("code")) {
+                statusCode = status.getInt("code");
+            }
+            if (status.has("message")) {
+                statusMessage = status.getString("message");
+            }
+        }
+
+
+        long startTimeNano = Long.parseLong(span.getString("startTimeUnixNano"));
+        long endTimeNano = Long.parseLong(span.getString("endTimeUnixNano"));
+        clickhouseSpan.Duration = endTimeNano - startTimeNano;
+        clickhouseSpan.Timestamp = Date.from(Instant.ofEpochMilli((long) (startTimeNano / 1e6)));
+
+
+        if (span.has("attributes")) {
+            JSONArray spanAttributes = span.getJSONArray("attributes");
+            JSONAnyValueList spanAttributesMap = new JSONAnyValueList(spanAttributes);
+            clickhouseSpan.TargetIP = spanAttributesMap.getString("net.peer.name");
+            clickhouseSpan.TargetPort = spanAttributesMap.getInt("net.peer.port");
+            clickhouseSpan.FuncName = spanAttributesMap.getString("code.function");
+            clickhouseSpan.FuncNamespace = spanAttributesMap.getString("code.namespace");
+            clickhouseSpan.SpanAttributes = spanAttributesMap.toStringMap();
+
+            if (statusCode == StatusError && statusMessage.isEmpty() && clickhouseSpan.SpanKind == SpanKindClient) {
+                // fill status message with http code
+                statusMessage = "status code:" + String.valueOf(spanAttributesMap.getInt("http.status_code"));
+            }
+
+            // http check http.status_code . io.opentelemetry.apache-httpclient-4.0
+            // rpc check rpc.system. io.opentelemetry.grpc-1.6
+            clickhouseSpan.RPCType = this.getRPCType(clickhouseSpan.SpanKind, spanAttributesMap);
+            clickhouseSpan.RPCName = this.getRPCName(clickhouseSpan.SpanKind, clickhouseSpan.RPCType,
+                    clickhouseSpan.SpanName, spanAttributesMap);
+            clickhouseSpan.RPCResult = this.getRPCResult(clickhouseSpan.SpanKind, clickhouseSpan.RPCType,
+                    spanAttributesMap);
+            clickhouseSpan.FuncLineNO = spanAttributesMap.getInt("code.lineno");
+            clickhouseSpan.DBStatement = spanAttributesMap.getString("db.statement");
+            clickhouseSpan.DBConnectionString = spanAttributesMap.getString("db.connection_string");
+        }
+        if (span.has("events")) {
+            JSONArray events = span.getJSONArray("events");
+            if (events != null && events.length() > 0) {
+                SpanEvents spanEvents = new SpanEvents(events);
+                List<SpanEvent> exEvents = spanEvents.getEventsByName("exception");
+                for (SpanEvent ev : exEvents) {
+                    clickhouseSpan.Exceptions.add(ev.getException());
+                }
+                for (int i = 0; i < events.length(); i++) {
+                    JSONObject e = events.getJSONObject(i);
+                    String evTsStr = e.getString("timeUnixNano");
+                    if (evTsStr == null) {
+                        continue;
+                    }
+
+                    try {
+                        long evTs = Long.parseLong(evTsStr);
+                        String evName = e.getString("name");
+                        if (evTs > 0 && evName != null && !evName.isEmpty()) {
+                            clickhouseSpan.Events.add(new TraceEvent(evTs, evName));
+                        }
+                    } catch (NumberFormatException exception) {
+                        logger.warn("timeUnixNano:{} not a valid long", evTsStr, exception);
+                        continue;
+                    }
+                }
+            }
+        }
+        clickhouseSpan.FuncResult = 0;
+        clickhouseSpan.StatusCode = statusCode;
+        clickhouseSpan.StatusMessage = statusMessage;
+        return clickhouseSpan;
+    }
+
+    public List<SpringBootSpan> map(String value) {
+        try {
+            JSONObject msg = new JSONObject(value);
+            JSONArray resourceSpans = msg.getJSONArray("resourceSpans");
+            List<SpringBootSpan> ret = new ArrayList<>();
+            for (int i = 0; i < resourceSpans.length(); i++) {
+                JSONObject resourceSpan = resourceSpans.getJSONObject(i);
+                JSONArray resourceAttrs = resourceSpan
+                        .getJSONObject("resource")
+                        .getJSONArray("attributes");
+                JSONAnyValueList resourceAttributesAnyValue = new JSONAnyValueList(resourceAttrs);
+                String serviceName = resourceAttributesAnyValue.getString("service.name");
+                String containerID = resourceAttributesAnyValue.getString("container.id");
+                String serviceIP = resourceAttributesAnyValue.getString("pod.ip");
+                JSONArray scopeSpans = resourceSpan.getJSONArray("scopeSpans");
+                for (int j = 0; j < scopeSpans.length(); j++) {
+                    JSONObject scopeSpan = scopeSpans.getJSONObject(j);
+                    JSONObject scope = scopeSpan.getJSONObject("scope");
+                    String scopeName = getFromJSONObject(scope, "name", "UNKNOWN");
+                    String scopeVersion = this.getFromJSONObject(scope, "version", "");
+                    JSONArray spans = scopeSpan.getJSONArray("spans");
+                    for (int k = 0; k < spans.length(); k++) {
+                        JSONObject span = spans.getJSONObject(k);
+                        try {
+                            SpringBootSpan clickhouseSpan = this.parseSpan(span, serviceName, scopeName, serviceIP,
+                                    scopeVersion, containerID, resourceAttributesAnyValue.toStringMap());
+                            ret.add(clickhouseSpan);
+                        } catch (JSONException e) {
+                            logger.error("parse span failed", e);
+                            continue;
+                        }
+                    }
+                }
+            }
+            return ret;
+        } catch (JSONException e) {
+            logger.error("[TraceSpringBoot]: parse json failed", e);
+            return new ArrayList<>();
+        }
+    }
+
+}

+ 18 - 0
src/main/java/com/cecf/observe/TraceEvent.java

@@ -0,0 +1,18 @@
+package com.cecf.observe;
+
+import java.time.Instant;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TraceEvent {
+    public TraceEvent(long nano, String name) {
+        Timestamp = Date.from(Instant.ofEpochMilli((long) (nano / 1e6)));
+        Name = name;
+        Attributes = new HashMap<>();
+    }
+
+    public Date Timestamp;
+    public String Name;
+    public Map<String, String> Attributes;
+}

+ 14 - 0
src/main/java/com/cecf/observe/TraceException.java

@@ -0,0 +1,14 @@
+package com.cecf.observe;
+
+public class TraceException {
+
+    public TraceException(String t, String m, String st) {
+        Typ = t;
+        Message = m;
+        StackTrace = st;
+    }
+
+    public String Typ;
+    public String Message;
+    public String StackTrace;
+}

+ 11 - 0
src/main/java/com/cecf/observe/TraceLink.java

@@ -0,0 +1,11 @@
+package com.cecf.observe;
+
+import java.util.Map;
+
+public class TraceLink {
+    public String TraceID;
+    public String SpanID;
+    public String TraceState;
+    public Map<String, String> Attributes;
+}
+

+ 101 - 0
src/main/java/com/cecf/observe/TraceStream.java

@@ -0,0 +1,101 @@
+package com.cecf.observe;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink;
+import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
+import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class TraceStream {
+
+    private static String getEnv(String key, String de) {
+        String value = System.getenv(key);
+        if (value == null || value.isEmpty()) {
+            return de;
+        }
+        return value;
+    }
+
+    public static final Logger logger = LoggerFactory.getLogger(TraceStream.class);
+
+    static KafkaSource<String> buildKafkaSource() {
+        String brokers = getEnv("KAFKA_BROKERS", "kafka-headless.observe.svc.cluster.local:9092");
+        return KafkaSource.<String>builder()
+                .setBootstrapServers(brokers)
+                .setTopics(getEnv("KAFKA_TOPIC", "otel"))
+                .setGroupId(getEnv("KAFKA_GROUP", "ob-agent-stream"))
+                .setStartingOffsets(OffsetsInitializer.latest())
+                .setValueOnlyDeserializer(new SimpleStringSchema())
+                .build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        Configuration envConfig = new Configuration();
+        envConfig.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
+        envConfig.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE); // number of restart attempts
+        envConfig.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(10)); // delay
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(envConfig);
+        Map<String, String> globalParameters = new HashMap<>();
+
+        String chHosts = getEnv("CH_HOST", "http://clickhouse.observe.svc.cluster.local:8123");
+        globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, chHosts);
+        globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, getEnv("CH_USER", "default"));
+        globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, getEnv("CH_PWD", "cecf@cestong.com"));
+
+        globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, getEnv("CH_TIMEOUT", "10"));
+        globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "./failed_record");
+        globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, getEnv("CH_WRITER_NUM", "1"));
+        globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, getEnv("CH_RETRY_NUM", "3"));
+        globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, getEnv("CH_WRITE_QUEUE_SIZE", "10"));
+        globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "true");
+
+        String excludeService = getEnv("EXCLUDE_SERVICE", "");
+        String[] excludeServiceList = excludeService.split(",");
+
+        ParameterTool parameters = ParameterTool.fromMap(globalParameters);
+        env.getConfig().setGlobalJobParameters(parameters);
+        KafkaSource<String> source = buildKafkaSource();
+
+        Properties props = new Properties();
+        props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, getEnv("CH_TABLE_NAME", "otel.traces_local"));
+        props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, getEnv("CH_WRITE_BUF_SIZE", "1000000000"));
+
+        SpringBootSpanCSVConverter converter = new SpringBootSpanCSVConverter();
+
+
+        String mysqlDSN = getEnv("MYSQL_DSN", "jdbc:mysql://m1.cestong.com.cn:31511/observe");
+        String mysqlUserName = getEnv("MYSQL_USER", "root");
+        String mysqlPwd = getEnv("MYSQL_PWD", "pg3mfWRtYonekZWB");
+
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), "KafkaSource")
+                .map(new SpringBootSpanExtractMapFunc())
+                .name("InfoExtract")
+                .flatMap(new SpringBootFlatMapFunc())
+                .name("ResourceSpansFlatToSpan")
+                .filter(new ServiceNameFilter(excludeServiceList))
+                .name("FilterByServiceName")
+                .map(new FillAppNameMapFunc(mysqlDSN, mysqlUserName, mysqlPwd))
+                .name("AttachAppName")
+                .addSink(new ClickHouseSink<>(props, converter))
+                .name("ClickhouseSink");
+        env.fromSource()
+
+        logger.info("配置完成,运行flink");
+        env.execute(getEnv("FLINK_JOB_NAME", "Ob stream operator"));
+    }
+
+
+}

+ 25 - 0
src/main/resources/log4j2.properties

@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

+ 1 - 0
version

@@ -0,0 +1 @@
+v1.3.1