中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

惠州建站公司seo建站的步驟

惠州建站公司,seo建站的步驟,做曖暖愛(ài)視頻網(wǎng)站,網(wǎng)站建設(shè)的意義和目的背景: 廣播狀態(tài)可以用于規(guī)則表或者配置表的實(shí)時(shí)更新,本文就是用一個(gè)欺詐檢測(cè)的flink作業(yè)作為例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用 1.首先看主流…

背景:

廣播狀態(tài)可以用于規(guī)則表或者配置表的實(shí)時(shí)更新,本文就是用一個(gè)欺詐檢測(cè)的flink作業(yè)作為例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用

BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用

1.首先看主流程,主流程中使用了兩個(gè)Broadcast廣播的狀態(tài),這兩個(gè)Broadcast廣播的狀態(tài)是獨(dú)立的

 // 這里面包含規(guī)則廣播狀態(tài)的兩次使用方法,分別在DynamicKeyFunction處理函數(shù)和DynamicAlertFunction處理函數(shù),注意這兩個(gè)處理函數(shù)中的廣播狀態(tài)是獨(dú)立的,也就是需要分別維度,不能共享// Processing pipeline setupDataStream<Alert> alerts =transactions.connect(rulesStream).process(new DynamicKeyFunction()).uid("DynamicKeyFunction").name("Dynamic Partitioning Function").keyBy((keyed) -> keyed.getKey()).connect(rulesStream).process(new DynamicAlertFunction()).uid("DynamicAlertFunction").name("Dynamic Rule Evaluation Function");

2.BroadcastProcessFunction的處理,這里面會(huì)維護(hù)這個(gè)算子本身的廣播狀態(tài),并把所有的事件擴(kuò)散發(fā)送到下一個(gè)算子

public class DynamicKeyFunctionextends BroadcastProcessFunction<Transaction, Rule, Keyed<Transaction, String, Integer>> {@Overridepublic void open(Configuration parameters) {}// 這里會(huì)把每個(gè)事件結(jié)合上廣播狀態(tài)中的每個(gè)規(guī)則生成N條記錄,流轉(zhuǎn)到下一個(gè)算子@Overridepublic void processElement(Transaction event, ReadOnlyContext ctx, Collector<Keyed<Transaction, String, Integer>> out)throws Exception {ReadOnlyBroadcastState<Integer, Rule> rulesState =ctx.getBroadcastState(Descriptors.rulesDescriptor);forkEventForEachGroupingKey(event, rulesState, out);}// 獨(dú)立維護(hù)廣播狀態(tài),可以在廣播狀態(tài)中新增刪除或者清空廣播狀態(tài)@Overridepublic void processBroadcastElement(Rule rule, Context ctx, Collector<Keyed<Transaction, String, Integer>> out) throws Exception {log.info("{}", rule);BroadcastState<Integer, Rule> broadcastState =ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);if (rule.getRuleState() == RuleState.CONTROL) {handleControlCommand(rule.getControlType(), broadcastState);}}}static void handleRuleBroadcast(Rule rule, BroadcastState<Integer, Rule> broadcastState)throws Exception {switch (rule.getRuleState()) {case ACTIVE:case PAUSE:broadcastState.put(rule.getRuleId(), rule);break;case DELETE:broadcastState.remove(rule.getRuleId());break;}}

3.KeyedBroadcastProcessFunction的處理,這里面也是會(huì)維護(hù)這個(gè)算子本身的廣播狀態(tài),此外還有鍵值分區(qū)狀態(tài),特別注意的是在處理廣播元素時(shí),可以用applyToKeyedState方法對(duì)所有的鍵值分區(qū)狀態(tài)應(yīng)用某個(gè)方法,對(duì)于ontimer方法,依然可以訪問(wèn)鍵值分區(qū)狀態(tài)和廣播狀態(tài)

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.ververica.field.dynamicrules.functions;import static com.ververica.field.dynamicrules.functions.ProcessingUtils.addToStateValuesSet;
import static com.ververica.field.dynamicrules.functions.ProcessingUtils.handleRuleBroadcast;import com.ververica.field.dynamicrules.Alert;
import com.ververica.field.dynamicrules.FieldsExtractor;
import com.ververica.field.dynamicrules.Keyed;
import com.ververica.field.dynamicrules.Rule;
import com.ververica.field.dynamicrules.Rule.ControlType;
import com.ververica.field.dynamicrules.Rule.RuleState;
import com.ververica.field.dynamicrules.RuleHelper;
import com.ververica.field.dynamicrules.RulesEvaluator.Descriptors;
import com.ververica.field.dynamicrules.Transaction;
import java.math.BigDecimal;
import java.util.*;
import java.util.Map.Entry;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.accumulators.SimpleAccumulator;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;/** Implements main rule evaluation and alerting logic. */
@Slf4j
public class DynamicAlertFunctionextends KeyedBroadcastProcessFunction<String, Keyed<Transaction, String, Integer>, Rule, Alert> {private static final String COUNT = "COUNT_FLINK";private static final String COUNT_WITH_RESET = "COUNT_WITH_RESET_FLINK";private static int WIDEST_RULE_KEY = Integer.MIN_VALUE;private static int CLEAR_STATE_COMMAND_KEY = Integer.MIN_VALUE + 1;private transient MapState<Long, Set<Transaction>> windowState;private Meter alertMeter;private MapStateDescriptor<Long, Set<Transaction>> windowStateDescriptor =new MapStateDescriptor<>("windowState",BasicTypeInfo.LONG_TYPE_INFO,TypeInformation.of(new TypeHint<Set<Transaction>>() {}));@Overridepublic void open(Configuration parameters) {windowState = getRuntimeContext().getMapState(windowStateDescriptor);alertMeter = new MeterView(60);getRuntimeContext().getMetricGroup().meter("alertsPerSecond", alertMeter);}// 鍵值分區(qū)狀態(tài)和廣播狀態(tài)聯(lián)合處理,在這個(gè)方法中可以更新鍵值分區(qū)狀態(tài),然后廣播狀態(tài)只能讀取@Overridepublic void processElement(Keyed<Transaction, String, Integer> value, ReadOnlyContext ctx, Collector<Alert> out)throws Exception {long currentEventTime = value.getWrapped().getEventTime();addToStateValuesSet(windowState, currentEventTime, value.getWrapped());long ingestionTime = value.getWrapped().getIngestionTimestamp();ctx.output(Descriptors.latencySinkTag, System.currentTimeMillis() - ingestionTime);Rule rule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(value.getId());if (noRuleAvailable(rule)) {log.error("Rule with ID {} does not exist", value.getId());return;}if (rule.getRuleState() == Rule.RuleState.ACTIVE) {Long windowStartForEvent = rule.getWindowStartFor(currentEventTime);long cleanupTime = (currentEventTime / 1000) * 1000;ctx.timerService().registerEventTimeTimer(cleanupTime);SimpleAccumulator<BigDecimal> aggregator = RuleHelper.getAggregator(rule);for (Long stateEventTime : windowState.keys()) {if (isStateValueInWindow(stateEventTime, windowStartForEvent, currentEventTime)) {aggregateValuesInState(stateEventTime, aggregator, rule);}}BigDecimal aggregateResult = aggregator.getLocalValue();boolean ruleResult = rule.apply(aggregateResult);ctx.output(Descriptors.demoSinkTag,"Rule "+ rule.getRuleId()+ " | "+ value.getKey()+ " : "+ aggregateResult.toString()+ " -> "+ ruleResult);if (ruleResult) {if (COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {evictAllStateElements();}alertMeter.markEvent();out.collect(new Alert<>(rule.getRuleId(), rule, value.getKey(), value.getWrapped(), aggregateResult));}}}//維護(hù)廣播狀態(tài),新增/刪除或者整個(gè)清空,值得注意的是,處理廣播元素時(shí)可以對(duì)所有的鍵值分區(qū)狀態(tài)應(yīng)用某個(gè)函數(shù),比如這里當(dāng)收到某個(gè)屬于控制消息的廣播消息時(shí),使用applyToKeyedState方法把所有的鍵值分區(qū)狀態(tài)都清空@Overridepublic void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out)throws Exception {log.info("{}", rule);BroadcastState<Integer, Rule> broadcastState =ctx.getBroadcastState(Descriptors.rulesDescriptor);handleRuleBroadcast(rule, broadcastState);updateWidestWindowRule(rule, broadcastState);if (rule.getRuleState() == RuleState.CONTROL) {handleControlCommand(rule, broadcastState, ctx);}}private void handleControlCommand(Rule command, BroadcastState<Integer, Rule> rulesState, Context ctx) throws Exception {ControlType controlType = command.getControlType();switch (controlType) {case EXPORT_RULES_CURRENT:for (Map.Entry<Integer, Rule> entry : rulesState.entries()) {ctx.output(Descriptors.currentRulesSinkTag, entry.getValue());}break;case CLEAR_STATE_ALL:ctx.applyToKeyedState(windowStateDescriptor, (key, state) -> state.clear());break;case CLEAR_STATE_ALL_STOP:rulesState.remove(CLEAR_STATE_COMMAND_KEY);break;case DELETE_RULES_ALL:Iterator<Entry<Integer, Rule>> entriesIterator = rulesState.iterator();while (entriesIterator.hasNext()) {Entry<Integer, Rule> ruleEntry = entriesIterator.next();rulesState.remove(ruleEntry.getKey());log.info("Removed Rule {}", ruleEntry.getValue());}break;}}private boolean isStateValueInWindow(Long stateEventTime, Long windowStartForEvent, long currentEventTime) {return stateEventTime >= windowStartForEvent && stateEventTime <= currentEventTime;}private void aggregateValuesInState(Long stateEventTime, SimpleAccumulator<BigDecimal> aggregator, Rule rule) throws Exception {Set<Transaction> inWindow = windowState.get(stateEventTime);if (COUNT.equals(rule.getAggregateFieldName())|| COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {for (Transaction event : inWindow) {aggregator.add(BigDecimal.ONE);}} else {for (Transaction event : inWindow) {BigDecimal aggregatedValue =FieldsExtractor.getBigDecimalByName(rule.getAggregateFieldName(), event);aggregator.add(aggregatedValue);}}}private boolean noRuleAvailable(Rule rule) {// This could happen if the BroadcastState in this CoProcessFunction was updated after it was// updated and used in `DynamicKeyFunction`if (rule == null) {return true;}return false;}private void updateWidestWindowRule(Rule rule, BroadcastState<Integer, Rule> broadcastState)throws Exception {Rule widestWindowRule = broadcastState.get(WIDEST_RULE_KEY);if (rule.getRuleState() != Rule.RuleState.ACTIVE) {return;}if (widestWindowRule == null) {broadcastState.put(WIDEST_RULE_KEY, rule);return;}if (widestWindowRule.getWindowMillis() < rule.getWindowMillis()) {broadcastState.put(WIDEST_RULE_KEY, rule);}}// ontimer方法中可以訪問(wèn)/更新鍵值分區(qū)狀態(tài),讀取廣播狀態(tài),此外ontimer方法和processElement方法以及processBroadcastElement方法是同步的,不需要考慮并發(fā)訪問(wèn)的問(wèn)題@Overridepublic void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<Alert> out)throws Exception {Rule widestWindowRule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(WIDEST_RULE_KEY);Optional<Long> cleanupEventTimeWindow =Optional.ofNullable(widestWindowRule).map(Rule::getWindowMillis);Optional<Long> cleanupEventTimeThreshold =cleanupEventTimeWindow.map(window -> timestamp - window);cleanupEventTimeThreshold.ifPresent(this::evictAgedElementsFromWindow);}private void evictAgedElementsFromWindow(Long threshold) {try {Iterator<Long> keys = windowState.keys().iterator();while (keys.hasNext()) {Long stateEventTime = keys.next();if (stateEventTime < threshold) {keys.remove();}}} catch (Exception ex) {throw new RuntimeException(ex);}}private void evictAllStateElements() {try {Iterator<Long> keys = windowState.keys().iterator();while (keys.hasNext()) {keys.next();keys.remove();}} catch (Exception ex) {throw new RuntimeException(ex);}}
}

ps: ontimer方法和processElement方法是同步訪問(wèn)的,沒(méi)有并發(fā)的問(wèn)題,所以不需要考慮同時(shí)更新鍵值分區(qū)狀態(tài)的線程安全問(wèn)題

參考文獻(xiàn):
https://flink.apache.org/2020/01/15/advanced-flink-application-patterns-vol.1-case-study-of-a-fraud-detection-system/

http://m.risenshineclean.com/news/58713.html

相關(guān)文章:

  • 無(wú)刷新網(wǎng)站b站推廣網(wǎng)站入口202
  • b2c網(wǎng)站服務(wù)內(nèi)容國(guó)家提供的免費(fèi)網(wǎng)課平臺(tái)
  • 心理測(cè)試用什么網(wǎng)站做上海最近3天疫情情況
  • 做網(wǎng)站做小時(shí)seo加盟
  • 有什么做3維的案例網(wǎng)站濟(jì)南網(wǎng)站seo
  • 點(diǎn)擊網(wǎng)絡(luò)怎么做網(wǎng)站合肥百度seo排名
  • 做產(chǎn)品批發(fā)生意用什么類(lèi)型的網(wǎng)站好備案域名查詢(xún)
  • 兩學(xué)一做注冊(cè)網(wǎng)站嗎搜索引擎營(yíng)銷(xiāo)的特點(diǎn)有
  • 唯美個(gè)人網(wǎng)站欣賞黃頁(yè)網(wǎng)站推廣效果
  • 青島網(wǎng)站開(kāi)發(fā)企業(yè)百度提交入口網(wǎng)址截圖
  • 深圳做網(wǎng)站哪家公司好廈門(mén)網(wǎng)站建設(shè)公司
  • 公司微網(wǎng)站怎么建設(shè)網(wǎng)站關(guān)鍵詞公司
  • 做5g網(wǎng)站空間容量要多少錢(qián)長(zhǎng)尾詞在線挖掘
  • 航達(dá)建設(shè)網(wǎng)站正規(guī)的培訓(xùn)機(jī)構(gòu)有哪些
  • 如何生成網(wǎng)站的二維碼輸入關(guān)鍵詞自動(dòng)生成標(biāo)題
  • 凡科做網(wǎng)站html網(wǎng)站模板免費(fèi)
  • 做機(jī)械的專(zhuān)業(yè)外貿(mào)網(wǎng)站有哪些鏈接下載
  • python做網(wǎng)站原理怎么宣傳自己的產(chǎn)品
  • 有做網(wǎng)站設(shè)計(jì)的嗎引擎優(yōu)化seo是什么
  • 怎樣做網(wǎng)站關(guān)鍵詞優(yōu)化網(wǎng)站推廣優(yōu)化外包公司哪家好
  • 真人男女直接做的視頻網(wǎng)站深圳華強(qiáng)北新聞最新消息今天
  • 怎么做網(wǎng)站架構(gòu)網(wǎng)絡(luò)推廣公司收費(fèi)標(biāo)準(zhǔn)
  • 網(wǎng)站首頁(yè)圖片素材長(zhǎng)圖大全搜索引擎哪個(gè)最好用
  • wordpress站群版seo包年服務(wù)
  • 建筑公司電話號(hào)碼重慶網(wǎng)頁(yè)優(yōu)化seo
  • 網(wǎng)站名稱(chēng)能用商標(biāo)做名稱(chēng)嗎公司網(wǎng)絡(luò)推廣網(wǎng)站
  • 網(wǎng)站的著陸頁(yè)啟信聚客通網(wǎng)絡(luò)營(yíng)銷(xiāo)策劃
  • wordpress 禁用縮略圖電腦優(yōu)化大師官方免費(fèi)下載
  • WordPress太占空間了如何優(yōu)化培訓(xùn)體系
  • 沈陽(yáng)男科醫(yī)院哪家好點(diǎn)兒濟(jì)南seo怎么優(yōu)化