探索Flink动态CEP:杭州银行的实战案例
摘要:本文撰写自杭州银行大数据工程师唐占峰、欧阳武林老师。将介绍 Flink 动态 CEP的定义与核心概念、应用场景、并深入探讨其技术实现并介绍使用方式。主要分为以下几个内容:
-
Flink动态CEP简介
-
Flink动态CEP的应用场景
-
Flink动态CEP的技术实现
-
Flink动态CEP的使用方式
-
杭州银行应用实践
Tips:点击「阅读原文」跳转阿里云实时计算 Flink~
金融行业大数据技术正在进入成熟期,数据的实时性在金融的实时监控和分析交易数据以识别洗钱行为、欺诈行为、和确保合规性是至关重要的。随着业务环境的快速变化,传统的静态规则引擎已经无法满足这些需求,因为它们在规则变更时需要重启服务,这会导致服务中断和延迟响应。我们引入由 Flink 发展过来的 Flink 动态 CEP 作为行内的动态规则引擎,它能够在不中断服务的情况下动态更新规则,适应不断变化的业务需求。
CEP 是复杂事件处理 Complex Event Processing 的缩写,而 Flink CEP 则是基于 Flink 实现的复杂事件处理库,它可以识别出数据流中符合特定模式(Pattern)的事件序列,并允许用户作出针对性处理。Flink 动态 CEP,作为 Flink CEP 的高级功能,进一步扩展了这一能力,它支持在不重启服务的情况下动态更新规则。这种动态性不仅提高了系统的灵活性和响应速度,还大大降低了维护成本和复杂性。
01
Flink 动态 CEP 简介
1. Flink 动态 CEP 的定义和核心概念
Flink 动态 CEP 是 Apache Flink 流处理框架的一个高级功能,它允许通过DataStream(数据流)作业方式运行支持动态规则更新的 Flink CEP 作业,对 数据流进行动态的捕获、清洗和分析。Flink 动态 CEP 做到了基于 Flink 全托管快速构建动态加载最新规则来处理上游的数据流,让用户有机会实时掌握数据中重要的高阶特征。
关键概念
①pattern(模式):模式是规则,也是定义规则的方式。一个模式可以是单例或者循环模式,单例只接受一个事件,循环模式可以接受多个事件。用户可以使用pattern 来识别匹配到的事件。多个 pattern 可以组成复杂模式,我们把由多个pattern 组成的复杂模式序列称为 patternProcessor(模式处理器)。
②事件流:事件流可以来自异构上游,可以是 kafka 数据,也可以是数据库表数据(如交易流水类的实时事件流)。当 Flink 动态 CEP 作业启动后,遇到实际输入事件流,Flink 会尝试识别定义的 patternProcessor 并进行动态匹配,最终得到匹配结果。
③动态匹配:Flink 动态 CEP 会实时识别事件流变化,并不断发送给下游算子,下游算子接收到发送的事件进行解析和反序列化后生成真正使用的 patternProcessor,根据最新的 patternProcessor 定义的规则进行动态匹配。
2. Flink 动态 CEP 解决的问题
Flink CEP 是一种规则引擎,是通过设置规则模式来匹配事件的。而频繁变化的交易、记账场景要求我们对初始规则进行调整或者对规则进行新增。例如一个 CEP 作业初始规则是转账用户在一分钟内连续进行3次转账后将其认为是风险操作。而在特殊场景,预期转账次数会多一点,一分钟3次的转账次数阈值可能不合适,在当前开源 Flink CEP 实现下,没法做到使用户无感的转换,只能重新编写 Java 代码,然后重启作业,以使最新的规则生效。这样的操作带来时间成本较高和重启作业代价高的问题。因为要走一遍完整的代码开发和打包上线流程对于对时间延迟敏感程度高的银行风控领域是难以接受的,且规则引擎里通常会维护很多不同的规则,如果简单的规则修改都需要较长的时间窗口,会影响其他人的使用,维护起来也比较困难。Flink 动态 CEP 很好的降低了传统规则引擎较高的时间成本并做到无需重启作业就能丝滑更新规则,以下是 Flink 动态 CEP 解决的主要问题:
①动态规则更新:传统规则引擎在规则变更时需要重新部署和启动作业,这会导致服务中断,影响系统的实时性和可用性。而 Flink 动态 CEP 允许在不中断服务的情况下动态加载和更新 CEP 规则,这意味着可以在运行时修改模式匹配逻辑,而无需重启整个 Flink 作业。
②多规则支持:在静态场景下使用多条规则时,传统 Flink CEP 需要创建多个 CepOperator(CEP算子),这会导致数据的额外拷贝,增加处理开销。Flink动态 CEP 支持在一个 Operator(算子)中处理多条规则,减少了数据拷贝,提高了处理效率。
③参数化 Condition 支持:Flink 动态 CEP 支持在 Json 格式规则描述中定义参数化的 Condition,提高了自定义 Condition 的拓展性,解决了动态添加新的 Condition 类实现的需求。
02
Flink 动态 CEP 的应用场景
Flink 动态 CEP 就像是一个智能监控系统,它不仅能在线识别风险行为(比如洗钱或欺诈),还能为实时营销助力,为业务赋能。和金融领域相关的应用场景如下:
1. 反洗钱
Flink 动态 CEP 可以监控银行账户的交易活动,识别出类似洗钱的行为。例如,可以设置规则来识别短时间内频繁的大额存款和取款行为,或者识别出与洗钱交易相关的账户之间的资金流动,从而触发反洗钱调查。也可以结合大数据技术和机器学习技术构建洗钱风险监测模型,更准确地识别可疑交易和潜在洗钱风险客户。还可以运用 Flink 动态 CEP 的流式计算技术实时分析处理客户的全链路交易信息,结合知识图谱、实时智能等技术,构建起全行级别反洗钱领域客户关系网络图,深入融合可疑交易特征,动态完整展现资金流转全貌。
2. 反欺诈
国内电信网络诈骗非常的猖獗,金融领域的反欺诈系统对电信网络诈骗案件能起到非常关键的作用,能及时阻断欺诈案件中的资金流动减少用户资金损失。反欺诈系统对系统本身分布式、实时性、规则灵活、复杂规则匹配能力要求非常高,而 Flink 动态 CEP 在 Flink 的分布式、实时性的特性基础上,增加复杂规则匹配和规则动态配置能力,为反欺诈系统提供一种很好的解决方案。
3. 实时营销
在金融客户申请信用卡的时候,客户通常需要完成填充基本信息、个人身份信息认证等多个步骤完成信用卡的申请。用户在多步骤申请信用卡的过程中,有可能会因为各种原因在其中的任意一个环节退出、失败或超时。针对这种情况,利用客户行为日志作为数据源,Flink 动态 CEP 可以利用多种规则对各个环节客户的行为数据做规则匹配、计算。并可以根据输出结果做多种营销策略的输出,如推送客户优惠券、推送消息给客户经理及时联系客户来提高营销效率,为业务赋能。
03
Flink 动态 CEP 的技术实现
根据以上背景并基于阿里在社区提出的 FLIP-200 方案,ververica-cep 开源demo,数据架构研发团队在部门内实现了一版 Flink 动态 CEP 的支持。下面详细介绍我们是如何实现的。
在 Flink 动态 CEP 中我们复用了 Flink 的 OperatorCoordinator(算子协调器)机制,用它来负责协调FLink作业中的各个 operator(算子)。OperatorCoordinator 在 JobManager 中运行,会给 TaskManager 的 Operator 发送事件,我们实现的 DynamicCEPOperatorCoordinator(动态 CEP 算子协调器)是 OperatorCoordinator 的实现类,它是 JobManager 中运行的线程,负责调用 PatternProcessorDiscoverer(模式处理器探查器)接口拿到最新的 PatternProcessor。Flink 动态 CEP 的整体架构图如下所示:
上图展示的是从数据库中读取序列化后的 PatternProcessor 的过程。可以看到OperatorCoordinator 会调用 PatternProcessorDiscoverer 接口从数据库中拿到最新的且序列化后的 PatternProcessor,拿到后它会发送给和它关联的DynamicCEPOp(动态cep算子)。DynamicCEPOp 接收到发送的事件进行解析和反序列化后,最终生成要使用的 PatternProcessor 并构造相应的NFA(非确定有限状态机)。之后即可使用新构造的NFA来处理上游发生的事件,并最终输出到下游。基于这样的方式,可以做到不停机的规则更新,且只有 OperatorCoordinator 和规则数据库交互,可以减少对数据库的访问,并利用Flink 的特性保证下游 sub_task 中使用规则的一致性。
了解了 Flink 动态CEP获取规则的流程,接下来要构建FlinkCEP作业,最重要的方法,就是构建 CEP.dynamicPatterns(),阿里云实时计算 Flink 版已经定义了CEP.dynamicPatterns()Api,该 API 定义代码如下:
public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns(DataStream<T> input,PatternProcessorDiscovererFactory<T> patternProcessorDiscovererFactory,TimeBehaviour timeBehaviour,TypeInformation<R> outTypeInfo)
该方法入参说明如下:
参数 | 说明 |
---|---|
DataStream<T> input | 输入事件流 |
PatternProcessorDiscovererFactory<T> patternProcessorDiscovererFactory | 工厂对象,负责构造一个探查器(PatternProcessorDiscoverer),探查器负责获取最新规则,即构造一个PatternProcessor接口 |
TimeBehaviour TimeBehaviour | 描述FlinkCEP作业如何处理事件的时间属性。参数取值如下:ProcessingTime:代表按照ProcessingTime处理事件 EventTime:代表按照Event Time处理事件 |
TypeInformation<R> OutTypeInfo | 描述输出流的类型信息 |
dynamicPatterns() 方法中 input、OutTypeInfo 分别定义输入和输出流,TimeBehaviour 定义时间属性,这里不需要多做介绍,PatternProcessorDiscovererFactory<T>接口负责构造探查器 PatternProcessorDiscoverer 以拿到最新 PatternProcessor,在实现Flink动态CEP功能中起到关键作用,故本文着重对 patternProcessor、 PatternProcessorDiscoverer 两个接口及其实现类和负责拿到 PatternProcessor 并发送给下游算子的 DynamicCEPOperatorCoordinator 的代码进行详细。
1. patternProcessor接口及其实现
public interface PatternProcessor<IN> extends Serializable, Versioned{String getId();default Long getTimestamp(){return Long.MIN_VALUE;}Pattern<IN,?> getPattern(ClassLoader classLoader);PatternProcessFunction<IN,?> getPatternProcessFunction();
}
PatternProcessor 接口用于完整定义CEP中的一条规则。一个PatternProcessor 实现类包含一个确定的模式(Pattern)用于描述如何去匹配事件、一个 PatternProcessFunction 用于描述怎么处理一个匹配事件。除此之外还包括id和 version(可选)等用于标识 PatternProcessFunction 的信息属性。因此一个PatternProcessor既包含规则本身,也指明了规则触发时,Flink 作业如何做出响应。
@PublicEvolving
public class DefaultPatternProcessor<T> implements PatternProcessor<T> {/** The ID of the pattern processor. */private final String id;/** The version of the pattern processor. */private final Integer version;/** The pattern of the pattern processor. */private final String patternStr;private final @Nullable PatternProcessFunction<T, ?> patternProcessFunction;public DefaultPatternProcessor(final String id,final Integer version,final String pattern,final @Nullable PatternProcessFunction<T, ?> patternProcessFunction,final ClassLoader userCodeClassLoader) {this.id = checkNotNull(id);this.version = checkNotNull(version);this.patternStr = checkNotNull(pattern);this.patternProcessFunction = patternProcessFunction;}@Overridepublic String toString() {return "DefaultPatternProcessor{"+ "id='"+ id+ '\''+ ", version="+ version+ ", pattern="+ patternStr+ ", patternProcessFunction="+ patternProcessFunction+ '}';}@Overridepublic String getId() {return id;}@Overridepublic int getVersion() {return version;}@Overridepublic Pattern<T, ?> getPattern(ClassLoader classLoader) {try {return (Pattern<T, ?>) CepJsonUtils.convertJSONStringToPattern(patternStr, classLoader);} catch (Exception e) {throw new RuntimeException(e);}}@Overridepublic PatternProcessFunction<T,?> getPatternProcessFunction(){return patternProcessFunction;}
}
DefaultPatternProcessor 类是 PatternProcessor 的默认实现,它接收 id, version, pattern 字符串, PatternProcessFunction 和 ClassLoader 作为参数。并使用 checkNotNull 确保除了 patternProcessFunction 外的参数不为 null。它的 getPattern 方法中包括转换json字符串到CEP能识别的 pattern 的方法 convertJSONStringToPattern(),我们重写了 convertJSONStringToPattern() 方法,接受入参为我们指定的 classloader (类加载器)如下所示:
public static Pattern<?, ?> convertJSONStringToPattern(String jsonString, ClassLoader userCodeClassLoader) throws Exception {if (userCodeClassLoader == null) {LOG.warn("The given userCodeClassLoader is null. Will try to use ContextClassLoader of current thread.");return convertJSONStringToPattern(jsonString);}GraphSpec deserializedGraphSpec = objectMapper.readValue(jsonString, GraphSpec.class);return deserializedGraphSpec.toPattern(userCodeClassLoader);
}
它的核心方法 toPattern() 涉及到 GraphSpec 类和方法本身,GraphSpec 类是Flink 中用于描述 Pattern 序列化和反序列化的工具,它用于处理由节点 (Nodes) 和边 (Edges) 组成的图形结构。这里的节点可以是单独的 Pattern 或者是嵌套的 GraphSpec,边则定义了节点之间的关系和数据流的方向,这和数据库中存储的规则Dag紧密相关,这里不做过多解释,具体来看 toPattern() 方法的实现:
public Pattern<?, ?> toPattern(final ClassLoader classLoader) throws Exception {// Construct cache of nodes and edges for later usefinal Map<String, NodeSpec> nodeCache = new HashMap<>();for (NodeSpec node : nodes) {nodeCache.put(node.getName(), node);}final Map<String, EdgeSpec> edgeCache = new HashMap<>();for (EdgeSpec edgeSpec : edges) {edgeCache.put(edgeSpec.getSource(), edgeSpec);}String currentNodeName = findBeginPatternName();Pattern<?, ?> prevPattern = null;String prevNodeName = null;while (currentNodeName != null) {NodeSpec currentNodeSpec = nodeCache.get(currentNodeName);EdgeSpec edgeToCurrentNode = edgeCache.get(prevNodeName);Pattern<?, ?> currentPattern =currentNodeSpec.toPattern(prevPattern,afterMatchStrategy.toAfterMatchSkipStrategy(),prevNodeName == null? ConsumingStrategy.STRICT: edgeToCurrentNode.getType(),classLoader);if (currentNodeSpec instanceof GraphSpec) {ConsumingStrategy strategy =prevNodeName == null? ConsumingStrategy.STRICT: edgeToCurrentNode.getType();prevPattern =buildGroupPattern(strategy, currentPattern, prevPattern, prevNodeName == null);} else {prevPattern = currentPattern;}prevNodeName = currentNodeName;currentNodeName =edgeCache.get(currentNodeName) == null? null: edgeCache.get(currentNodeName).getTarget();}// Add window semanticsif (window != null && prevPattern != null) {prevPattern.within(this.window.getTime(), this.window.getType());}return prevPattern;
}
toPattern方法是 GraphSpec 类中的核心方法之一,它负责将 GraphSpec 对象序列化信息反序列化回 Pattern 对象。它的内部逻辑包含几个步骤:
①构建节点和边缓存:创建 nodeCache 和 edgeCache 映射,分别存储NodeSpec和 EdgeSpec 实例。这有助于在后续处理中快速查找和使用节点和边的信息
②确定开始节点:初始化 currentNodeName 变量,它表示当前处理的节点名称。这个值通过调用 findBeginPatternName() 方法获得,该方法确保从图中的开始节点开始处理。
③构建 Pattern 迭代:
使用循环迭代所有节点,从开始节点开始,根据边的信息向前构建模式。在每次迭代中:从 nodeCache 获取当前节点的 NodeSpec。从 edgeCache 获取从上一个节点到当前节点的 EdgeSpec(如果存在)。使用 NodeSpec 和 EdgeSpec 构建或更新当前的 Pattern。这涉及到根据消耗策略(ConsumingStrategy)来使用不同的 Pattern 方法,如 Pattern.begin(), Pattern.next(),Pattern.followedBy(), 或 Pattern.followedByAny()。最后更新 prevPattern 和 prevNodeName 为下一个迭代做准备。最终返回构建完成的Pattern对象。
以上详细介绍了 patternProcessor 接口实现和其中的关键方法,描述了可用的Pattern 构建过程。下面介绍 PatternProcessorDiscoverer 接口及其实现。
2. PatternProcessorDiscoverer接口及其实现
public abstract interface PatternProcessorDiscoverer<T> extends Closeable
{public abstract void discoverPatternProcessorUpdates(PatternProcessorManager<T> paramPatternProcessorManager);
}
PatternProcessorDiscoverer 接口用于描述如何发现 Processor。
我们基于阿里云默认周期性扫描外部存储的抽象类periodicPatternProcessorDiscoverer,提供了一个用于从支持 JDBC 协议的数据库中拉取最新规则的实现:JDBCPeriodicPatternProcessorDiscoverer
public class JDBCPeriodicPatternProcessorDiscoverer<T>extends PeriodicPatternProcessorDiscoverer<T> {private static final Logger LOG =LoggerFactory.getLogger(JDBCPeriodicPatternProcessorDiscoverer.class);private final String tableName;private final String userName;private final String password;private final String jdbcUrl;private final String tenant;private final List<PatternProcessor<T>> initialPatternProcessors;private final ClassLoader userCodeClassLoader;private Connection connection;private Statement statement;private ResultSet resultSet;private Map<String, Tuple4<String, Integer, String, String>> latestPatternProcessors = new ConcurrentHashMap<>();/*** Creates a new using the given initial {@link PatternProcessor} and the time interval how* often to check the pattern processor updates.> *** @param jdbcUrl The JDBC url of the database.> * @param jdbcDriver The JDBC driver of the database.> * @param initialPatternProcessors The list of the initial {@link PatternProcessor}.> * @param intervalMillis Time interval in milliseconds how often to check updates.>*/public JDBCPeriodicPatternProcessorDiscoverer(final String jdbcUrl,final String jdbcDriver,final String tableName,final String userName,final String password,@Nullable final String tenant,final ClassLoader userCodeClassLoader,@Nullable final List<PatternProcessor<T>> initialPatternProcessors,@Nullable final Long intervalMillis)throws Exception {super(intervalMillis);this.tableName = requireNonNull(tableName);this.initialPatternProcessors = initialPatternProcessors;this.userCodeClassLoader = userCodeClassLoader;this.userName = userName;this.password = password;this.jdbcUrl = jdbcUrl;this.tenant = tenant;Class.forName(requireNonNull(jdbcDriver));this.connection = DriverManager.getConnection(requireNonNull(jdbcUrl), userName, password);this.statement = this.connection.createStatement();}
JDBCPeriodicPatternProcessorDiscoverer 包括两个关键方法 arePatternProcessorsUpdated() 和 getLatestPatternProcessors(),分别用于判断 PatternProcessors 是否被更新和获取最新的 PatternProcessors。
@Override
public boolean arePatternProcessorsUpdated() throws SQLException {if (latestPatternProcessors == null&& !CollectionUtil.isNullOrEmpty(initialPatternProcessors)) {return true;}LOG.info("Start check is pattern processor updated.");if (statement == null) {try {this.connection = DriverManager.getConnection(requireNonNull(jdbcUrl), userName, password);this.statement = this.connection.createStatement();} catch (SQLException e) {LOG.error("Connect to database error!", e);throw e;}}try {String sql = buildQuerySql();LOG.info("Statement execute sql is {}", sql);resultSet = statement.executeQuery(sql);Map<String, Tuple4<String, Integer, String, String>> currentPatternProcessors = new ConcurrentHashMap<>();while (resultSet.next()) {LOG.debug("check getLatestPatternProcessors start :{}", resultSet.getString(1));String id = resultSet.getString("id");if (currentPatternProcessors.containsKey(id)&& currentPatternProcessors.get(id).f1 >= resultSet.getInt("version")) {continue;}currentPatternProcessors.put(id,new Tuple4<>(requireNonNull(resultSet.getString("id")),resultSet.getInt("version"),requireNonNull(resultSet.getString("pattern")),resultSet.getString("function")));}if (latestPatternProcessors == null|| isPatternProcessorUpdated(currentPatternProcessors)) {LOG.debug("latest pattern processors size is {}", currentPatternProcessors.size());latestPatternProcessors = currentPatternProcessors;return true;} else {return false;}} catch (SQLException e) {LOG.error("Pattern processor discoverer failed to check rule changes, will recreate connection.", e);try {statement.close();connection.close();connection = DriverManager.getConnection(requireNonNull(this.jdbcUrl), this.userName, this.password);statement = connection.createStatement();} catch (SQLException ex) {LOG.error("Connect pattern processor discovery database error.", ex);throw new RuntimeException("Cannot recreate connection to database.");}}return false;
}
arePatternProcessorsUpdated() 用于检查数据库中存储的模式处理器是否发生了更新,它首先会检查是否有尚未处理的初始模式处理器列表(initialPatternProcessors),如果存在未被处理的 PatternProcessor,则直接返回true。接着建立数据库连接,调用 buildQuerySql() 来执行 sql,用于从 tableName 指定的表中获取所有或特定租户 (tenant) 的模式处理器信息。然后处理sql的执行结果,对每一个 currentPatternProcessors,检查是否已存在或版本是否更旧。如果存在更旧的版本则跳过,否则更新 currentPatternProcessors 映射。如果 latestPatternProcessors 为空或存在更新,则用 currentPatternProcessors 更新 latestPatternProcessors,并返回 true,表示有更新。
@Override
public List<PatternProcessor<T>> getLatestPatternProcessors() throws Exception {LOG.debug("Start convert pattern processors to default pattern processor.");return latestPatternProcessors.values().stream().map(patternProcessor -> {try {String patternStr = patternProcessor.f2;GraphSpec graphSpec =CepJsonUtils.convertJSONStringToGraphSpec(patternStr);LOG.debug("Latest pattern processor is {}",CepJsonUtils.convertGraphSpecToJSONString(graphSpec));PatternProcessFunction<T, ?> patternProcessFunction = null;String id = patternProcessor.f0;int version = patternProcessor.f1;if (!StringUtils.isNullOrWhitespaceOnly(patternProcessor.f3)) {patternProcessFunction =(PatternProcessFunction<T, ?>)this.userCodeClassLoader.loadClass(patternProcessor.f3).getConstructor(String.class, int.class, String.class).newInstance(id, version, tenant);}return new DefaultPatternProcessor<>(patternProcessor.f0,patternProcessor.f1,patternStr,patternProcessFunction,this.userCodeClassLoader);} catch (Exception e) {LOG.error("Get the latest pattern processors of the discoverer failure. - ", e);e.printStackTrace();}return null;}).filter(pre -> pre != null).collect(Collectors.toList());
}
getLatestPatternProcessors() 方法涉及从数据库获取最新 PatternProcessors的过程,利用 StreamAPI,将存储在 ConcurrentHashMap 中的模式处理器信息转换为 PatternProcessor 列表。这里涉及到实例化的过程:根据模式处理器信息中的类名(patternProcessor.f3),通过类加载器加载并实例化自定义的 PatternProcessFunction。如果类名不为空或非空字符串,将其转换为对应的 Java 类,并调用构造函数,传入处理器的 id、version 和租户 tenant 信息。使用上述信息,创建一个 DefaultPatternProcessor 实例,封装模式字符串、自定义的处理器函数、类加载器等信息,最后返回一个PatternProcessor 列表,其中包含了从数据库中获取的所有模式处理器的最新实例。这些实例可以被 Flink 的 CEP 功能直接使用,以处理复杂事件模式匹配。
3. PatternProcessorDiscoverer接口及其实现
接下来介绍 DynamicCepOperatorCoordinator(动态CEP算子协调器),它承担着调用上文 PatternProcessorDiscoverer 接口从数据库中拿到最新的且序列化后的 PatternProcessor,并发送给和它关联的 DynamicCEPOp 的任务如下所示:
public class DynamicCepOperatorCoordinator<T> implements OperatorCoordinator {private static final Logger LOG =LoggerFactory.getLogger(DynamicCepOperatorCoordinator.class);private final DynamicCepOperatorCoordinatorContext cepCoordinatorContext;private final PatternProcessorDiscovererFactory discovererFactory;private final String operatorName;private boolean started;private volatile boolean closed;public DynamicCepOperatorCoordinator(String operatorName, PatternProcessorDiscovererFactory discovererFactory, DynamicCepOperatorCoordinatorContext context) {this.cepCoordinatorContext = context;this.discovererFactory = discovererFactory;this.operatorName = operatorName;this.started = false;this.closed = false;}@Overridepublic void start() throws Exception {Preconditions.checkState(!started, "Dynamic Cep Operator Coordinator Started!");LOG.info("Starting Coordinator for {}:{}", this.getClass().getSimpleName(), operatorName);cepCoordinatorContext.runInCoordinatorThreadWithFixedRate(()->{if (discovererFactory instanceof PeriodicPatternProcessorDiscovererFactory) {try {PeriodicPatternProcessorDiscoverer patternProcessorDiscoverer =(PeriodicPatternProcessorDiscoverer) discovererFactory.createPatternProcessorDiscoverer(cepCoordinatorContext.getUserCodeClassloader());boolean updated = patternProcessorDiscoverer.arePatternProcessorsUpdated();if (updated && started) {Set<Integer> subtasks = cepCoordinatorContext.getSubtasks();if (!patternProcessorDiscoverer.getLatestPatternProcessors().isEmpty()) {UpdatePatternProcessorEvent updatePatternProcessorEvent =new UpdatePatternProcessorEvent(patternProcessorDiscoverer.getLatestPatternProcessors());subtasks.forEach(subtaskId -> {cepCoordinatorContext.sendEventToOperator(subtaskId, updatePatternProcessorEvent);});}}} catch (Exception e) {LOG.error("Starting Coordinator failed", e);}}});started = true;}@Overridepublic void close() throws Exception {closed = true;cepCoordinatorContext.close();}@Overridepublic void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {LOG.info("Received event {} from operator {}.", event, subtask);}@Overridepublic void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {
// cepCoordinatorContext.runInCoordinatorThread(() -> {LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", operatorName, checkpointId);try {resultFuture.complete("Dynamic cep".getBytes(StandardCharsets.UTF_8));} catch (Throwable e) {ExceptionUtils.rethrowIfFatalErrorOrOOM(e);resultFuture.completeExceptionally(new CompletionException(String.format("Failed to checkpoint for dynamic cep %s",operatorName),e));}}@Overridepublic void notifyCheckpointComplete(long checkpointId) {}@Overridepublic void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {}@Overridepublic void subtaskReset(int subtask, long checkpointId) {}@Overridepublic void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {cepCoordinatorContext.subtaskNotReady(subtask);}@Overridepublic void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {cepCoordinatorContext.subtaskReady(gateway);}
}
下面只介绍它的关键方法start(),用于负责初始化和激活协调器的运行流程:
start() 方法调用 cepCoordinatorContext.runInCoordinatorThreadWithFixedRate 来安排一个周期性执行的任务。这个方法将在框架的协调器线程中执行一个 lambda 表达式定义的任务,定期检查模式处理器更新。在这里我们定义的时间是10s,也就是每10s检查和执行一次 patternProcessors 的更新逻辑。然后构建UpdatePatternProcessorEvent,由 cepCoordinatorContext 来广播它给下游算子。需要注意的是,DynamicCepOperatorCoordinator 是 jobmanager 运行的线程,和 taskmanager 中 PatternProcessor 的产生过程是异步的。
04
Flink 动态 CEP 的使用方式
本章介绍如何编写 Flink 动态 CEP 作业,具体操作流程如下(以Kafka源为例):
1. 连接数据源(数据源也可以是来自数据库,配置不同的连接器即可)
public static void main(String[] args) throws Exception {
// Set up the streaming execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Classloader initialfinal ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// Process args
// Build Kafka source with new Source API based on FLIP-27Properties prop =new Properties();prop.setProperty("security.protocol","SASL_PLAINTEXT");prop.setProperty("sasl.mechanism","SCRAM-SHA-256");prop.setProperty("sasl.jaas.config","org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule" +" required username=\"100670\" password=\"000000000\";");KafkaSource<Event> kafkaSource = KafkaSource.<Event>builder().setBootstrapServers("123.4.50.105:9292,123.4.60.106:9292,123.4.50.107:9292").setTopics("cep_test1").setGroupId("test").setStartingOffsets(OffsetsInitializer.earliest()).setProperties(prop).setValueOnlyDeserializer((new KafkaJsonDeserializer())).build();env.setParallelism(1);DataStream<Event> input = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "source");
// keyBy userId and productionId
// Notes, only events with the same key will be processd to see if there is a matchKeyedStream<Event, Tuple2<String, String>> keyedStream =input.keyBy(new KeySelector<Event, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> getKey(Event value) throws Exception {return Tuple2.of(value.getName(), value.getName());}});
①初始化执行环境
②Kafka 源配置,并将事件流 Event 根据 name 字段进行 keyby
2. 构建动态规则匹配
long time = 1000;
SingleOutputStreamOperator<String> output = CEP.dynamicPatterns(keyedStream,new JDBCPeriodicPatternProcessorDiscovererFactory<>("jdbc:mysql//123.45.6.789:3306/cep_demo_db","com.mysql.cj.jdbc.Driver","rds_demo","riskcollateral","riskcollateral",null,null,timer),TimeBehaviour.ProcessingTime,TypeInformation.of(new TypeHint<String>()){}));output.addSink(new PrintSinkFunction<>().name("cep"));env.excute("CEPDemo");}
}
3. 构建并运行
我们使用 Streampark 作为 Flink 作业的运维管控平台,根据以下步骤创建 Flink jar 包作业:
①添加jar包资源:
②添加作业:
③添加作业相关配置:
④发布及启动作业:
4. 插入规则
①建表 rds_demo 用于存储 cep 规则:
②插入动态更新规则:
将表示 Pattern 的 JSON 字符串与 id、version、function 类名一起插入 rds_demo 表中(阿里云实时计算Flink版定义了一套 JSON 格式的规则描述,详情请参加阿里云文档——动态 CEP 中规则的 JSON 格式定义):
id | version | pattern | function |
---|---|---|---|
1 | 1 | {"name":"end","quantifier":{"consumingStrategy}... | xxxpackage.dynamic.cep.core.DemoPatternProcessFunction |
将 pattern 的 JSON 字符串解析后,展示如下:
{"name": "end","quantifier": {"consumingStrategy": "SKIP_TILL_NEXT","properties": ["SINGLE"],"times": null,"untilCondition": null},"condition": null,"nodes": [{"name": "end","quantifier": {"consumingStrategy": "SKIP_TILL_NEXT","properties": ["SINGLE"],"times": null,"untilCondition": null},"condition": {"className": "xxxpackage.dynamic.cep.core.EndCondition","type": "CLASS"},"type": "ATOMIC"},{"name": "start","quantifier": {"consumingStrategy": "SKIP_TILL_NEXT","properties": ["LOOPING"],"times": null,"untilCondition": null},"type": "ATOMIC"}],"edges": [{"source": "start","target": "end","type": "SKIP_TILL_NEXT"}],"window": null,"afterMatchStrategy": {"type": "SKIP_PAST_LAST_EVENT","patternName": null},"type": "COMPOSITE","version": 1
}
这段 JSON 规则描述了一个复合模式 (COMPOSITE),它由两个原子节点(ATOMIC)组成:“start”和“end”。
这个模式目的是匹配一个特定的事件序列,其中“start”节点匹配 action 等于0的输入事件,而“end”节点匹配“xxxpackage.dynamic.cep.core.EndCondition”这个类定义的事件,这个条件由开发者定义,例如:
public class EndCondition extends SimpleCondition<Event> {@Overridepublic boolean filter(Event value) throws Exception {return value.getAction() != 1;}
}
这个 EndCondition 用于检查事件的 action 属性是否不等于1.如果事件的 action 属性不等于1,那么 filter 方法将返回 true,表示事件满足 end 节点的条件。
结合起来,这个模式的匹配的事件序列满足:“start”节点匹配所有 action 等于0的事件,一旦遇到一个 action 不等于1的事件,“end”节点的条件被满足,整个模式匹配完成。
function 字段用 DemoPatternProcessFunction 类的全路径加类名指定,记录了匹配到记录以后的处理方法如下:
public class DemoPatternProcessFunction<IN> extends PatternProcessFunction<IN, String> {String id;int version;String tenant;public DemoPatternProcessFunction(String id, int version, String tenant) {this.id = id;this.version = version;this.tenant = tenant;}@Overridepublic void processMatch(final Map<String, List<IN>> match, final Context ctx, final Collector<String> out) {StringBuilder sb = new StringBuilder();sb.append("A match for Pattern of (id, version): (").append(id).append(", ").append(version).append(") is found. The event sequence: ").append("\n");for (Map.Entry<String, List<IN>> entry : match.entrySet()) {sb.append(entry.getKey()).append(": ").append(entry.getValue().get(0).toString()).append("\n");}out.collect(sb.toString());}
}
这个处理方法是如果 PatternProcessor 匹配到一个事件序列,processMatch 方法将生成对应的描述性字符串,并由下游算子通过 Collector 将其输出。
5. 输入事件流
假如有一个事件序列如下:
private static void sendEvents(Producer<String, String> producer, String topic) {ObjectMapper objectMapper = new ObjectMapper();Event[] events = {new Event("ken", 1, 1, 0, 1662022777000L),new Event("ken", 2, 1, 0, 1662022778000L),new Event("ken", 3, 1, 1, 1662022779000L),new Event("ken", 4, 1, 2, 1662022780000L),new Event("ken", 5, 1, 1, 1662022780000L)};while (true) {try {for (Event event : events) {String json = objectMapper.writeValueAsString(event);ProducerRecord<String, String> record = new ProducerRecord<>(topic, json);producer.send(record, (metadata, exception) -> {if (exception != null) {LOG.error("Failed to send data to Kafka: ", exception);} else {System.out.println(metadata.topic());LOG.info("Data sent successfully to topic {} at offset {}",metadata.topic(), metadata.offset());}});}} catch (Exception e) {LOG.error("Error while sending events to Kafka: ", e);}}
}
我们往 Kafka Topic 插入 events,我们将会观察到 “start” 节点会匹配前两个事件,因为它们的 action 属性为0。第四个事件 action 不等于1,因此“end”节点的条件被满足,模式匹配完成。第五个事件不会影响已经完成的模式匹配。
05
杭州银行应用实践
杭州银行在我们开发的 Flink 动态 CEP 规则引擎下,也有实际的业务场景落地和应用,如事件中心-行为序列事件模块。
事件中心是以用户行为埋点数据作为数据源,对他们进行处理和分析,并输出结果辅助业务决策的平台。其中行为序列事件模块应用了行内开发的 Flink 动态 CEP 技术。事件中心-行为序列事件模块如下:
新增一个行为序列事件,填好基础信息后,用户可在行为序列配置里可以新增事件或事件组,并配置事件过期时间。
一个行为序列事件模板如下:
如下图所示,1-5原子事件表示某用户的埋点行为序列,作为 Flink 动态 CEP 的输入流 event 按照埋点顺序进入动态规则匹配,而匹配的规则是事件过期时间,这里为 20分钟。例如某输入流在 20分钟内还未完成全部五个原子事件,而只完成到事件4,这样则视为模式匹配完成,匹配到的事件为事件1到事件4,可以通过配置输出流输出自定义的规则匹配结果(如用户名字、错误原因、用户手机号码等)到 kafka、rocketMQ 等消息队列。如此,就能给业务更有价值的数据支持,做针对性的用户推荐。
Flink 动态 CEP 在事件中心实践中的优势体现在,修改或新增规则或事件序列,完全无需启停服务,只需直接编辑并保存。web 端修改会同步修改数据库中保存的规则,然后选择上线,动态规则转换就完成了。
【参考文献】
[1]阿里云开发者社区.(2023−02−10).Flink CEP 新特性进展与在实时风控场景的落地.阿里云开发者社区.https://developer.aliyun.com/article/1157197
[2]阿里云帮助中心. (2023-11-07). Flink 动态 CEP 快速入门_实时计算 Flink版(Flink). 阿里云帮助中心. https://help.aliyun.com/zh/flink/getting-started/getting-started-with-dynamic-flink-cep
[3]Apache Flink. (2022-09-16). FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP). Apache Flink. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195730308
[4]Apache Flink. (v1.15.4). FlinkCEP-Flink的复杂事件处理 . https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/libs/cep/
[5]https://github.com/RealtimeCompute/ververica-cep-demohttps://github.com/RealtimeCompute/ververica-cep-demo
相关文章:
探索Flink动态CEP:杭州银行的实战案例
摘要:本文撰写自杭州银行大数据工程师唐占峰、欧阳武林老师。将介绍 Flink 动态 CEP的定义与核心概念、应用场景、并深入探讨其技术实现并介绍使用方式。主要分为以下几个内容: Flink动态CEP简介 Flink动态CEP的应用场景 Flink动态CEP的技术实现 Flin…...
单机服务和微服务
单体服务 一种软件开发模型,它将所有的服务组件集成在一个独立的系统单位中进行开发、部署和维护。在这种架构中,前端用户界面、后端服务器逻辑、数据库操作等组件通常紧密耦合在一起,形成一个统一的程序。这种架构模式易于开发和部署&#x…...
孔雀鱼和斑马鱼能一起养吗?
在观赏鱼的世界里,孔雀鱼和斑马鱼都是备受鱼友喜爱的热门品种。它们独特的外形和相对容易的饲养条件,使得不少养鱼新手跃跃欲试将它们混养在一起,但这其中实则有诸多因素需要考量。 从生存环境来看,孔雀鱼和斑马鱼有一定的兼容性…...
作业帮基于 Apache DolphinScheduler 3_0_0 的缺陷修复与优化
文|作业帮大数据团队(阮文俊、孙建业) 背 景 基于 Apache DolphinScheduler (以下简称DolphinScheduler)搭建的 UDA 任务调度平台有效支撑了公司的业务数据开发需求,处理着日均百万级别的任务量。 整个 UDA 的架构如…...
【LC】111. 二叉树的最小深度
题目描述: 给定一个二叉树,找出其最小深度。 最小深度是从根节点到最近叶子节点的最短路径上的节点数量。 说明:叶子节点是指没有子节点的节点。 示例 1: 输入:root [3,9,20,null,null,15,7] 输出:2示…...
HarmonyOS NEXT 实战之元服务:静态案例效果--- 歌手推荐
背景: 前几篇学习了元服务,后面几期就让我们开发简单的元服务吧,里面丰富的内容大家自己加,本期案例 仅供参考 先上本期效果图 ,里面图片自行替换 效果图1完整代码案例如下: import { authentication } …...
selenium自动化测试(超详细~)
最近也有很多人私下问我,selenium学习难吗,基础入门的学习内容很多是3以前的版本资料,对于有基础的人来说,3到4的差别虽然有,但是不足以影响自己,但是对于没有学过的人来说,通过资料再到自己写的…...
Spring Boot教程之三十一:入门 Web
Spring Boot – 入门 Web 如今,大多数应用程序都需要模型-视图-控制器(MVC) 架构来满足各种需求,例如处理用户数据、提高应用程序效率、为应用程序提供动态特性。它主要用于构建桌面图形用户界面 (GUI),但现在越来越流行用于构建基于 Web 的…...
【每日学点鸿蒙知识】指纹识别隐藏背面、数组内部值变化刷新UI、键盘输入类型、跨组件路由、C++20特性支持
1、HarmonyOS 指纹识别情况下,隐藏背面内容? 有一个场景,在指纹识别验证页面时候,此时需要用户看不到背面的内容,请问应该怎么处理这块。或者有什么方案,可以通过window,获取到当前页面的page&…...
Python数据处理——re库与pydantic的使用总结与实战,处理采集到的思科ASA防火墙设备信息
目录 Python正则表达式re库的基本用法 引入re库 各函数功能 总结 使用方法举例 正则表达式语法与书写方式 正则表达式的常用操作符 思科ASA防火墙数据 数据1 数据2 书写正则表达式 Python中pydantic的使用 导入基础数据模板 根据数据采集目标定义Pydantic数据类型…...
centos系统如何安装kubectl和部署kube-apiserver
1.使用 yum 安装(推荐) 添加 Kubernetes 软件源: 首先,你需要添加 Kubernetes 的官方 YUM 软件源。这可以通过下载并安装 kubernetes.repo 文件来实现。 shell cat <<EOF | sudo tee /etc/yum.repos.d/kubernetes.repo [k…...
【源码编译】windows下mingw64安装以及cmake调用
最近因为安装MIRTK库,太多第三方依赖了,太折磨了,学习了使用Cmake,有些库又需要Fortran编译器,VS2022里面装了但又调用不了,也不知道为什么,最后装的mingw64,记录一下。 1、mingw64安…...
HarmonyOS NEXT 实战之元服务:静态案例效果---最近播放音乐
背景: 前几篇学习了元服务,后面几期就让我们开发简单的元服务吧,里面丰富的内容大家自己加,本期案例 仅供参考 先上本期效果图 ,里面图片自行替换 效果图1完整代码案例如下: Index import { authentica…...
【QT开发自制小工具】PDF/图片转excel---调用百度OCR API接口
前言 前几年WPS还可以免费处理5页以内的PDF转excel,现在必须付费了,而且百度其他在线的PDF转excel都是要收费的,刚好前几年调研过百度OCR的高精度含位置接口,依然是每天可以免费调用50次,本篇是基于此接口,…...
uniapp 基于xgplayer(西瓜视频) + renderjs开发,实现APP视频播放
背景:在uniapp中因原生video组件功能有限,选择引入xgplayer库来展示视频播放等功能。并且APP端无法操作dom,所以使用了renderjs。 其他的不多说,主要列举一下renderjs中需要注意的点: 1、使用:在标签后&…...
[1111].集成开发工具Pycharm安装与使用
所有博客大纲 后端学习大纲 Python大纲 1.下载: 官方下载地址 2.安装: 1.双击exe文件,然后下一步选择安装目录 2.选择桌面快捷方式及安装: 3.安装完成 3.启动: 4.设置: 4.1.设置运行时环境:…...
【玩转OCR】 | 腾讯云智能结构化OCR在多场景的实际应用与体验
文章目录 引言产品简介产品功能产品优势 API调用与场景实践图像增强API调用实例发票API调用实例其他场景 结语相关链接 引言 在数字化信息处理的时代,如何高效、精准地提取和结构化各类文档数据成为了企业和政府部门的重要需求。尤其是在面对海量票据、证件、表单和…...
红狮金业:2024年尾声,黄金市场需要关注的消息面
随着2024年的尾声渐近,全球金融市场在美联储的年度最后一次降息决策中迎来了新的波澜。上周,美联储宣布降息,而美联储主席鲍威尔随后的发言更是在市场上掀起了巨大波动。他透露,美联储计划在明年放缓降息步伐,可能仅实…...
使用BCrypt进行密码加密
1. 添加依赖: 在pom.xml文件中添加Spring Security依赖,以使用BCryptPasswordEncoder。 <!-- Spring Security 依赖 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-secu…...
《智启新材热学性能优化之路:人工智能的神奇力量》
在当今科技飞速发展的时代,材料科学与人工智能的融合正成为创新的前沿阵地。其中,利用人工智能优化材料的热学性能,为众多领域的突破带来了新的曙光,从航空航天的高效热防护到电子设备的散热管理,其影响深远且广泛&…...
IIC驱动EEPROM
代码参考正点原子 i2c_dri:主要是三段式状态机的编写 module iic_dri#(parameter SLAVE_ADDR 7b1010000 , //EEPROM从机地址parameter CLK_FREQ 26d50_000_000, //模块输入的时钟频率parameter I2C_FREQ 18d250_000 //IIC_SCL的时钟频率)( …...
目标检测——基于yolov8和pyqt的螺栓松动检测系统
目录 1.项目克隆和环境配置1.1 我这里使用的是v8.0.6版本1.2 项目代码结构介绍 2.数据集介绍2.1 数据集采集2.2采集结果介绍 3.模型训练4.pyqt界面设计4.1 界面内容介绍4.2 界面实现 5.操作中的逻辑实现5.1 图片检测5.2 文件夹检测5.3 视频检测和摄像头检测 6. 效果展示 1.项目…...
JVM系列(十三) -常用调优工具介绍
最近对 JVM 技术知识进行了重新整理,再次献上 JVM系列文章合集索引,感兴趣的小伙伴可以直接点击如下地址快速阅读。 JVM系列(一) -什么是虚拟机JVM系列(二) -类的加载过程JVM系列(三) -内存布局详解JVM系列(四) -对象的创建过程JVM系列(五) -对象的内存分…...
酷睿i7和i5哪个好?i5和i7的区别介绍
在英特尔酷睿处理器家族中,i7与i5作为面向不同用户群体的主流产品,各自承载着不同的性能定位与使用价值。在面对“酷睿i7和i5哪个好”的问题时,答案并非一概而论,而是取决于具体的应用需求、预算考量以及对性能与效率的期待。本文…...
实现用户登录系统的前后端开发
**一、**实验名称 实现用户登录系统的前后端开发。 **二、**参考资料 Web开发技术第一章课件。 **三、**实验目的 1.练习前端基本技术的使用。 2.练习使用Servlet/JSP开发简单后端程序。 3.练习使用Tomcat发布Web应用。 4.练习使用Spring Boot开发简单的后端程序。 **…...
Log4j1.27配置日志输出级别不起效
起因:构建独立版本debezuim使用时,日志一直打印debug信息。 原因:包冲突问题,进行排包操作。 参考log4j日志级别配置完成后不生效 系统一直打印debug日志_log4j不起作用-CSDN博客 1、application.properties logging.configc…...
一、后端到摄像头(监控摄像头IOT)
前言: 开发流程从 后端到摄像头 打通是第一步,那么我们可以着手设计 后端实现 的具体步骤,确保能够稳定地接收和处理来自摄像头的视频流,并提供后续的功能扩展,如视频流转发、存储和控制。 1. 后端系统架构设计 在开始…...
H3C MPLS跨域optionB
实验拓扑 实验需求 如图,VPN1 和 VPN2 分别通过运营商 MPLS VPN 连接各自分支机构按照图示配置 IP 地址,VPN1 和 VPN2 连接同一个 PE 设备的私网 IP 网段存在地址复用,使用多 VRF 技术来防止 IP 冲突AS 100 和 AS 200 内部的公共网络中各自运行 OSPF 使 AS 内各设备的 Loo…...
微信小程序中momentjs无法切换中文问题处理
微信小程序中momentj.s无法切换中文问题处理. 表现为 使用 locale(“zh-cn”)无效。 处理方法 # 1、先删除 miniprogram_npm\moment\index.js # 2、将 node_modules\moment\min\moment-with-locales.min.js 复制到 miniprogram_npm\moment下 并重命名为index.js # 3、修改mi…...
Linux零基础速成篇一(理论+实操)
前言:本教程适合Linux零基础学习,也适合Linux期末考试的小伙伴,从头到尾理论与实操相结合,让你快速对Linux进行了解和掌握。 一、Linux概述 为什么要学习Linux操作系统? 完全免费-开源 任何用户均可下载使用 安全…...
【087】基于51单片机智能宠物喂食器【Proteus仿真+Keil程序+报告+原理图】
☆、设计硬件组成:51单片机最小系统LCD1602液晶显示MY1680语音播放模块DS1302时钟芯片SG90舵机AT24C02存储芯片LED灯按键设置。 1、设计采用STC89C52、AT89C52、AT89S52作为主控芯片; 2、系统采用DS1302时钟芯片实现对日期时间计时并通过LCD1602液晶显…...
全局流量管理:提升用户体验与保障服务稳定性
全局流量管理的工作原理 全局流量管理主要依赖于 DNS(域名系统)技术,通过将全球用户的访问请求引导至离他们最近的服务器或数据中心,极大减少了访问延迟和带宽负载,从而提升了用户体验。同时,GTM 还结合健…...
达梦数据守护搭建
主备库初始化 ./dminit path/dmdata/data db_nameDM01 instance_nameDMSVR01 port_num5236 page_size16 extent_size32 log_size500 case_sensitive1 SYSDBA_PWDDM01SYSDBA ./dminit path/dmdata/data db_nameDM02 instance_nameDMSVR02 port_num5236 page_size16 extent_size3…...
【C++】容器适配器全知道
亲爱的读者朋友们😃,此文开启知识盛宴与思想碰撞🎉。 快来参与讨论💬,点赞👍、收藏⭐、分享📤,共创活力社区。 目录 一、前言 二、什么是适配器 (一)现实生…...
企业资源规划系统(ERP)服务器上线项目实施指南
企业资源规划系统(ERP)服务器上线项目实施指南 项目背景 在当今竞争激烈的商业环境中,企业资源规划系统(ERP)已成为企业提升内部管理效率和响应市场变化的重要工具。为了实现业务流程的整合和优化,公司决…...
Kubernetes(k8s)离线部署DolphinScheduler3.2.2
1.环境准备 1.1 集群规划 本次安装环境为:3台k8s现有的postgreSql数据库zookeeper服务 1.2 下载及介绍 DolphinScheduler-3.2.2官网:https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2 官网安装文档:https://dolphinscheduler.apach…...
嵌入式linux驱动框架 从0到1编写设备驱动 i2c_driver i2c_client
一、编写最简单的IIC驱动框架 在之前的文章中,我们已经深入探讨了I2C通信的基本原理、硬件架构以及时序等基础知识。现在,我们将进入一个更加实际和深入的层面,构建一个完整的I2C驱动程序,并阐述驱动框架的设计。这将帮助你不仅能…...
小程序canvas画环形百分比进度图
组件封装 component/canvas-ring目录下 canvas-ring.js <canvas style"width:{{canvasWidth}}px;height:{{canvasWidth}}px; margin:0 auto;position:relative" type"2d" id"myCanvas"><view class"circle-bar" style&quo…...
面试经验分享 | 北京渗透测试岗位
更多大厂面试经验的视频经验分享看主页 目录: 所面试的公司:安全大厂 所在城市:北京 面试职位:渗透测试工程师 面试方式:腾讯会议线上面试线下面试 面试过程: 面试官的问题: 1、说一下XSS有哪…...
安卓 SystemServer 启动流程
目录 引言 Android系统服务启动顺序 zygote fork SystemServer 进程 SystemServer启动流程 1、SystemServer.main() 2、SystemServer.run() 3、初始化系统上下文 4、创建系统服务管理 5、启动系统各种服务 总结 引言 开机启动时 PowerManagerService 调用 AudioSer…...
opencv存图速度测试
以下测试的图片,均为5488x3672分辨率的三通道彩色图。 分别使用opencv和halcon存图,测试速度,存100次取平均值,结果如下: image size:5488 3672 opencv jpg save time 0.12809s opencv bmp save time 0.02197s hal…...
[ffmpeg]编译 libx264
步骤 下载 libx264 git clone https://code.videolan.org/videolan/x264.git cd x264环境搭建 然后在开始菜单中找到并打开 x64 Native Tools Command Prompt for VS 2019 : 打开 msys2_shell.cmd -use-full-path 这时会打开 MSYS 的新窗口,先把一些汇…...
常见API
1.API 1.1API概述 什么是API API (Application Programming Interface) :应用程序编程接口 java中的API 指的就是 JDK 中提供的各种功能的 Java类,这些类将底层的实现封装了起来,我们不需要关心这些类是如何实现的,只需要…...
vscode写python,遇到问题:ModuleNotFoundError: No module named ‘pillow‘(已解决 避坑)
1 问题: ModuleNotFoundError: No module named pillow 2 原因: 原因1:安装Pillow的pip命令所处的python版本与vscode调用的python解释器版本不同。 如: 原因2:虽然用的是pillow,但是写代码的时候只能用…...
【mysql】id主键列乱了之后,重新排序(可根据日期顺序)
一、ID中断不连续的,重新设置为连续的ID alter table table_name drop id; alter table table_name add id int not null first; alter table table_name modify column id int not null auto_increment, add primary key(id); select * from table_name order by …...
SO-CNN-LSTM-MATT蛇群算法优化注意力机制深度学习多特征分类预测
SO-CNN-LSTM-MATT蛇群算法优化注意力机制深度学习多特征分类预测(多输入单输出) 目录 SO-CNN-LSTM-MATT蛇群算法优化注意力机制深度学习多特征分类预测(多输入单输出)分类效果基本描述程序设计参考资料 分类效果 基本描述 1.Matl…...
原点安全再次入选信通院 2024 大数据“星河”案例
近日,中国信息通信研究院和中国通信标准化协会大数据技术标准推进委员会(CCSA TC601)共同组织开展的 2024 大数据“星河(Galaxy)”案例征集活动结果正式公布。由工银瑞信基金管理有限公司、北京原点数安科技有限公司联…...
Hadoop
HDFS一键启动/停止 start-dfs.sh stop-dfs.sh 单进程启动/停止 $HADOOP_HOME/bin/hdfs,此程序也可以用以单独控制所在机器的进程的启停 hdfs --daemon (start|status|stop) (namenode|secondarynamenode|datanode)#例如启动namenode,去到需要启动的服…...
【Ambari】使用 Knox 进行 LDAP 身份认证
目录 一、knox介绍 二、Ambari配置LDAP认证 三、验证Knox网关 3.1YARNUI 3.2 HDFSUI 3.3 HDFS RestFULL 3.4 SparkHistoryserver 3.5 HBASEUI 一、knox介绍 Apache Knox网关是一个用于与Apache Hadoop部署的REST api和ui交互的应用程序网关。Knox网关为所有与Apache Ha…...
计算机网络习题( 第3章 物理层 第4章 数据链路层 )
第3章 物理层 一、单选题 1、下列选项中,不属于物理层接口规范定义范畴的是( )。 A、 接口形状 B、 引脚功能 C、 传输媒体 D、 信号电平 正确答案: C 2、在物理层接口特性中,用于描述完成每种功能的事件发…...