建站之星備案百度推廣售后
DorisStreamLoadObserver
?類是一個用于將數(shù)據(jù)加載到 Doris(以前稱為 Palo)數(shù)據(jù)庫中并監(jiān)視加載過程的 Java 類。該類提供了一組方法,用于構(gòu)建 HTTP 請求、處理 HTTP 響應(yīng)以及監(jiān)控數(shù)據(jù)加載的狀態(tài)。以下是每個方法的具體作用:
DorisStreamLoadObserver(Keys options)
: 這是類的構(gòu)造函數(shù),用于初始化加載數(shù)據(jù)所需的配置選項。void streamLoad(WriterTuple data) throws Exception
: 該方法是數(shù)據(jù)加載的主要方法。它將給定的數(shù)據(jù)(WriterTuple
?對象)加載到 Doris 數(shù)據(jù)庫中。它構(gòu)建了用于將數(shù)據(jù)發(fā)送到 Doris 的 HTTP 請求,并根據(jù)響應(yīng)狀態(tài)來確定加載是否成功。如果加載失敗,它會拋出異常。private void checkStreamLoadState(String host, String label) throws IOException
: 這個方法用于檢查數(shù)據(jù)加載的狀態(tài)。它會不斷地輪詢 Doris 服務(wù)器,以獲取特定加載任務(wù)的最終狀態(tài)。根據(jù)加載狀態(tài)的不同,它可能會拋出異?;蛘咴诩虞d完成時返回。private byte[] addRows(List<byte[]> rows, int totalBytes)
: 此方法根據(jù)給定的數(shù)據(jù)行和總字節(jié)數(shù),構(gòu)建用于加載的字節(jié)數(shù)組。它根據(jù)配置中的數(shù)據(jù)格式(CSV 或 JSON)將數(shù)據(jù)行連接起來,并添加適當(dāng)?shù)姆指舴?/li>private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException
: 該方法執(zhí)行 HTTP PUT 請求,將數(shù)據(jù)加載到 Doris 數(shù)據(jù)庫中。它構(gòu)建了包含數(shù)據(jù)的請求實體,發(fā)送到指定的加載 URL,并解析響應(yīng)以獲取加載結(jié)果。private String getBasicAuthHeader(String username, String password)
: 此方法用于生成基本身份驗證頭部,以便在 HTTP 請求中進(jìn)行身份驗證。private HttpEntity getHttpEntity(CloseableHttpResponse response)
: 這是一個實用方法,用于從 HTTP 響應(yīng)中提取實體內(nèi)容。private String getLoadHost()
: 該方法從配置選項中獲取用于加載數(shù)據(jù)的主機地址列表,并嘗試連接到這些主機以檢查其可用性。它會返回第一個可用的主機地址。
DorisStreamLoadObserver
?類主要用于處理數(shù)據(jù)加載任務(wù),它負(fù)責(zé)構(gòu)建適當(dāng)?shù)?HTTP 請求,將數(shù)據(jù)發(fā)送到 Doris 數(shù)據(jù)庫,并監(jiān)控加載任務(wù)的狀態(tài)。通過這些方法,可以實現(xiàn)將數(shù)據(jù)從外部系統(tǒng)加載到 Doris 數(shù)據(jù)庫中,并在加載過程中進(jìn)行必要的狀態(tài)檢查和錯誤處理。
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;public class DorisStreamLoadObserver {private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class);private Keys options;private long pos;private static final String RESULT_FAILED = "Fail";private static final String RESULT_LABEL_EXISTED = "Label Already Exists";private static final String LAEBL_STATE_VISIBLE = "VISIBLE";private static final String LAEBL_STATE_COMMITTED = "COMMITTED";private static final String RESULT_LABEL_PREPARE = "PREPARE";private static final String RESULT_LABEL_ABORTED = "ABORTED";private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";public DorisStreamLoadObserver(Keys options) {this.options = options;}// 數(shù)據(jù)寫入 Doris 的主要方法public void streamLoad(WriterTuple data) throws Exception {String host = getLoadHost();if (host == null) {throw new IOException("load_url cannot be empty, or the host cannot connect. Please check your configuration.");}String loadUrl = new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/").append(options.getTable()).append("/_stream_load").toString();LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());Map<String, Object> loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));LOG.info("StreamLoad response :{}", JSONValue.toJSONString(loadResult));final String keyStatus = "Status";if (null == loadResult || !loadResult.containsKey(keyStatus)) {throw new IOException("Unable to flush data to Doris: unknown result status.");}LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {throw new IOException(new StringBuilder("Failed to flush data to Doris.\n").append(JSONValue.toJSONString(loadResult)).toString());} else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));checkStreamLoadState(host, data.getLabel());}}// 檢查數(shù)據(jù)加載狀態(tài)的方法private void checkStreamLoadState(String host, String label) throws IOException {int idx = 0;while (true) {try {TimeUnit.SECONDS.sleep(Math.min(++idx, 5));} catch (InterruptedException ex) {break;}try (CloseableHttpClient httpclient = HttpClients.createDefault()) {HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString());httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));httpGet.setHeader("Connection", "close");try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {HttpEntity respEntity = getHttpEntity(resp);if (respEntity == null) {throw new IOException(String.format("Failed to flush data to Doris, Error " +"could not get the final state of label[%s].\n", label), null);}Map<String, Object> result = (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));String labelState = (String) result.get("state");if (null == labelState) {throw new IOException(String.format("Failed to flush data to Doris, Error " +"could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);}LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));switch (labelState) {case LAEBL_STATE_VISIBLE:case LAEBL_STATE_COMMITTED:return;case RESULT_LABEL_PREPARE:continue;case RESULT_LABEL_ABORTED:throw new DorisWriterExcetion(String.format("Failed to flush data to Doris, Error " +"label[%s] state[%s]\n", label, labelState), null, true);case RESULT_LABEL_UNKNOWN:default:throw new IOException(String.format("Failed to flush data to Doris, Error " +"label[%s] state[%s]\n", label, labelState), null);}}}}}// 根據(jù)格式將數(shù)據(jù)行拼接成字節(jié)數(shù)組private byte[] addRows(List<byte[]> rows, int totalBytes) {if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);for (byte[] row : rows) {bos.put(row);bos.put(lineDelimiter);}return bos.array();}if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));bos.put("[".getBytes(StandardCharsets.UTF_8));byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);boolean isFirstElement = true;for (byte[] row : rows) {if (!isFirstElement) {bos.put(jsonDelimiter);}bos.put(row);isFirstElement = false;}bos.put("]".getBytes(StandardCharsets.UTF_8));return bos.array();}throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");}private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(120 * 1000).setConnectTimeout(120 * 1000).setConnectionRequestTimeout(120 * 1000).build();try (CloseableHttpClient httpclient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).setRedirectStrategy(new DefaultRedirectStrategy()).build()) {HttpPut httpPut = new HttpPut(loadUrl);httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "application/octet-stream");httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));httpPut.setEntity(new ByteArrayEntity(data));try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {HttpEntity respEntity = getHttpEntity(resp);if (respEntity == null) {throw new IOException("Failed to flush data to Doris, Error could not get the response entity.");}return (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));}}}// 構(gòu)造 HTTP 請求中的基本認(rèn)證頭部private String getBasicAuthHeader(String username, String password) {String credentials = username + ":" + password;byte[] credentialsBytes = credentials.getBytes(StandardCharsets.UTF_8);String base64Credentials = Base64.encodeBase64String(credentialsBytes);return "Basic " + base64Credentials;}// 從 HTTP 響應(yīng)中獲取實體內(nèi)容private HttpEntity getHttpEntity(CloseableHttpResponse response) {if (response != null) {return response.getEntity();}return null;}// 獲取用于加載數(shù)據(jù)的主機地址private String getLoadHost() {List<String> hosts = options.getDorisStreamLoadUrls();for (String host : hosts) {try {HttpURLConnection connection = (HttpURLConnection) new URL(host).openConnection();connection.setRequestMethod("HEAD");int responseCode = connection.getResponseCode();if (responseCode == HttpURLConnection.HTTP_OK) {return host;}} catch (IOException e) {LOG.warn("Failed to connect to host: {}", host);}}return null;}
}