Flink链接Kafka
一、基于 Flink 的 Kafka 消息生产者
- Kafka 生产者的创建与配置:
- 代码通过
FlinkKafkaProducer
创建 Kafka 生产者,用于向 Kafka 主题发送消息。
- 代码通过
- Flink 执行环境的配置:
- 配置了 Flink 的检查点机制,确保消息的可靠性,支持"精确一次"的消息交付语义。
- 模拟数据源:
- 通过
env.fromElements()
方法创建了简单的消息流,发送了三条消息"a"
,"b"
, 和"c"
。
- 通过
package com.example.kafka_flink.service;import com.example.kafka_flink.util.MyNoParalleSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.Properties;
@Service
public class SimpleKafkaProducer {public static void main(String[] args) throws Exception {// 创建 SimpleKafkaProducer 的实例SimpleKafkaProducer kafkaProducer = new SimpleKafkaProducer();// 调用 producer 方法kafkaProducer.producer();}public void producer() throws Exception {// 设置 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置检查点机制,设置检查点模式为 "Exactly Once"(精确一次)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 启用检查点机制,设置检查点时间间隔为 5000 毫秒(5 秒)env.enableCheckpointing(5000);// 配置 Kafka 属性,包括身份验证信息Properties properties = new Properties();properties.setProperty("bootstrap.servers", "xxxx");properties.setProperty("security.protocol", "SASL_PLAINTEXT");properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");// 创建 Kafka 生产者实例,并设置目标主题和序列化模式FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("WJ-TEST",// 使用 SimpleStringSchema 进行字符串序列化new SimpleStringSchema(),properties);// 模拟数据源,生产一些简单的消息,并将消息写入 Kafkaenv.fromElements("a", "b", "c").addSink(producer);// 启动 Flink 作业env.execute("Kafka Producer Job");}
}
二、基于 Flink 的 Kafka 消息消费者
2.1 消费一个Topic
-
设置 Flink 执行环境:
- 使用
StreamExecutionEnvironment.getExecutionEnvironment()
创建执行环境。
- 使用
-
启用检查点机制:
- 调用
env.enableCheckpointing(5000)
,设置检查点时间间隔为 5 秒。 - 配置检查点模式为
EXACTLY_ONCE
,确保数据一致性。
- 调用
-
配置 Kafka 属性:
- 设置 Kafka 服务器地址(
bootstrap.servers
)。 - 指定消费组 ID(
group.id
)。 - 配置安全协议和认证机制(
SASL_PLAINTEXT
和SCRAM-SHA-512
)。
- 设置 Kafka 服务器地址(
-
创建 Kafka 消费者:
- 使用
FlinkKafkaConsumer<String>
指定单个 Kafka Topic(如"WJ-TEST"
)。 - 设置消息反序列化方式为
SimpleStringSchema
。 - 配置消费者从最早偏移量开始消费(
setStartFromEarliest()
)。
- 使用
-
将 Kafka 消费者添加到 Flink 数据流:
- 调用
env.addSource(consumer)
添加 Kafka 消费者作为数据源。 - 使用
FlatMapFunction
处理消息,将其打印或进一步处理。
- 调用
-
启动 Flink 作业:
- 使用
env.execute("start consumer...")
启动 Flink 作业,开始消费 Kafka 的消息流。
- 使用
//消费单个topicpublic static void consumerOneTopic() throws Exception {// 设置 Flink 执行环境// 创建一个流处理的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 启用检查点机制,设置检查点的时间间隔为 5000 毫秒(5 秒)env.enableCheckpointing(5000);// 配置检查点模式为 "Exactly Once"(精确一次)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 配置 Kafka 属性,包括身份验证信息Properties properties = new Properties();// 设置 Kafka 集群地址properties.setProperty("bootstrap.servers", "xxxx");// 设置消费组 ID,用于管理消费偏移量properties.setProperty("group.id", "group_test");// 设置安全协议为 SASL_PLAINTEXTproperties.setProperty("security.protocol", "SASL_PLAINTEXT");// 设置 SASL 认证机制为 SCRAM-SHA-512properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");// 配置 SASL 登录模块,包含用户名和密码properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");// 创建一个 Kafka 消费者实例FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(// 设置要消费的 Kafka 主题名称"WJ-TEST",// 使用 SimpleStringSchema 将 Kafka 的消息反序列化为字符串new SimpleStringSchema(),// 传入 Kafka 的配置属性properties);// 设置消费者从 Kafka 的最早偏移量开始消费消息consumer.setStartFromEarliest();// 将 Kafka 消费者作为数据源添加到 Flink 的执行环境中env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {// 打印消费到的消息内容到控制台System.out.println(s);// 收集消费到的消息,供后续处理collector.collect(s);}});// 启动并执行 Flink 作业,作业名称为 "start consumer..."env.execute("start consumer...");}
生产消息结果:
2.2 消费多个Topic
-
设置 Flink 执行环境:
- 使用
StreamExecutionEnvironment.getExecutionEnvironment()
创建执行环境。
- 使用
-
启用检查点机制:
- 配置检查点模式为
EXACTLY_ONCE
,确保数据一致性。 - 调用
env.enableCheckpointing(5000)
设置检查点时间间隔为 5 秒。
- 配置检查点模式为
-
配置 Kafka 属性:
- 设置 Kafka 服务器地址(
bootstrap.servers
)。 - 指定消费组 ID(
group.id
)。 - 配置安全协议和认证机制(
SASL_PLAINTEXT
和SCRAM-SHA-512
)。
- 设置 Kafka 服务器地址(
-
定义 Kafka Topic 列表:
- 创建一个
List<String>
,添加多个 Kafka Topic 名称(如"WJ-TEST"
和"KAFKA_TEST_001"
)。
- 创建一个
-
创建 Kafka 消费者:
- 使用
FlinkKafkaConsumer
,传入 Kafka Topic 列表和自定义反序列化器(CustomDeSerializationSchema
)。 - 配置消费者从最早偏移量开始消费(
setStartFromEarliest()
)。
- 使用
-
将 Kafka 消费者添加到 Flink 数据流:
- 调用
env.addSource(consumer)
添加 Kafka 消费者作为数据源。 - 使用
FlatMapFunction
处理消息,打印消息的 Topic、分区、偏移量、键和值,并收集消息值进行进一步处理。
- 调用
-
启动 Flink 作业:
- 使用
env.execute("start consumer...")
启动 Flink 作业,开始消费 Kafka 的多个主题消息流。
- 使用
//消费多个topicpublic static void consumerTopics() throws Exception {// 设置 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置检查点机制,设置检查点模式为 "Exactly Once"(精确一次)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 启用检查点机制,设置检查点时间间隔为 5000 毫秒(5 秒)env.enableCheckpointing(5000);// 配置 Kafka 属性,包括身份验证信息Properties properties = new Properties();properties.setProperty("bootstrap.servers", "xxxx");properties.setProperty("group.id", "group_test");properties.setProperty("security.protocol", "SASL_PLAINTEXT");properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");// 定义需要消费的 Kafka 主题列表List<String> topics = new ArrayList<>();topics.add("WJ-TEST");topics.add("KAFKA_TEST_001");// 使用自定义反序列化器创建 Kafka 消费者实例FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<>(topics,new CustomDeSerializationSchema(),properties);// 设置消费者从 Kafka 的最早偏移量开始消费消息consumer.setStartFromEarliest();// 将 Kafka 消费者作为数据源添加到 Flink 的执行环境中env.addSource(consumer).flatMap(new FlatMapFunction<ConsumerRecord<String, String>, Object>() {@Overridepublic void flatMap(ConsumerRecord<String, String> record, Collector<Object> collector) throws Exception {// 打印消费到的消息内容到控制台System.out.println("Topic: " + record.topic() +", Partition: " + record.partition() +", Offset: " + record.offset() +", Key: " + record.key() +", Value: " + record.value());// 收集消费到的消息,供后续处理collector.collect(record.value());}});// 启动并执行 Flink 作业env.execute("start consumer...");}
2.3 消费Topic的总体代码
package com.example.kafka_flink.service;import com.example.kafka_flink.util.CustomDeSerializationSchema;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;/*** @author wangjian*/
@Service
public class SimpleKafkaConsumer {public static void main(String[] args) throws Exception {
// SimpleKafkaConsumer.consumerOneTopic();SimpleKafkaConsumer.consumerTopics();}//消费单个topicpublic static void consumerOneTopic() throws Exception {// 设置 Flink 执行环境// 创建一个流处理的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 启用检查点机制,设置检查点的时间间隔为 5000 毫秒(5 秒)env.enableCheckpointing(5000);// 配置检查点模式为 "Exactly Once"(精确一次)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 配置 Kafka 属性,包括身份验证信息Properties properties = new Properties();// 设置 Kafka 集群地址properties.setProperty("bootstrap.servers", "xxxx");// 设置消费组 ID,用于管理消费偏移量properties.setProperty("group.id", "group_test");// 设置安全协议为 SASL_PLAINTEXTproperties.setProperty("security.protocol", "SASL_PLAINTEXT");// 设置 SASL 认证机制为 SCRAM-SHA-512properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");// 配置 SASL 登录模块,包含用户名和密码properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");// 创建一个 Kafka 消费者实例FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(// 设置要消费的 Kafka 主题名称"WJ-TEST",// 使用 SimpleStringSchema 将 Kafka 的消息反序列化为字符串new SimpleStringSchema(),// 传入 Kafka 的配置属性properties);// 设置消费者从 Kafka 的最早偏移量开始消费消息consumer.setStartFromEarliest();// 将 Kafka 消费者作为数据源添加到 Flink 的执行环境中env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {// 打印消费到的消息内容到控制台System.out.println(s);// 收集消费到的消息,供后续处理collector.collect(s);}});// 启动并执行 Flink 作业,作业名称为 "start consumer..."env.execute("start consumer...");}//消费多个topicpublic static void consumerTopics() throws Exception {// 设置 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置检查点机制,设置检查点模式为 "Exactly Once"(精确一次)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 启用检查点机制,设置检查点时间间隔为 5000 毫秒(5 秒)env.enableCheckpointing(5000);// 配置 Kafka 属性,包括身份验证信息Properties properties = new Properties();properties.setProperty("bootstrap.servers", "xxxx");properties.setProperty("group.id", "group_test");properties.setProperty("security.protocol", "SASL_PLAINTEXT");properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");// 定义需要消费的 Kafka 主题列表List<String> topics = new ArrayList<>();topics.add("WJ-TEST");topics.add("KAFKA_TEST_001");// 使用自定义反序列化器创建 Kafka 消费者实例FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<>(topics,new CustomDeSerializationSchema(),properties);// 设置消费者从 Kafka 的最早偏移量开始消费消息consumer.setStartFromEarliest();// 将 Kafka 消费者作为数据源添加到 Flink 的执行环境中env.addSource(consumer).flatMap(new FlatMapFunction<ConsumerRecord<String, String>, Object>() {@Overridepublic void flatMap(ConsumerRecord<String, String> record, Collector<Object> collector) throws Exception {// 打印消费到的消息内容到控制台System.out.println("Topic: " + record.topic() +", Partition: " + record.partition() +", Offset: " + record.offset() +", Key: " + record.key() +", Value: " + record.value());// 收集消费到的消息,供后续处理collector.collect(record.value());}});// 启动并执行 Flink 作业env.execute("start consumer...");}}
2.4 自定义的 Kafka 反序列化器 (CustomDeSerializationSchema
)
实现了一个自定义的 Kafka 反序列化器 (CustomDeSerializationSchema
),主要功能是将从 Kafka 中消费到的消息(字节数组格式)解析为包含更多元数据信息的 ConsumerRecord<String, String>
对象。以下是其作用的具体说明:
-
解析 Kafka 消息:
- 消息的
key
和value
由字节数组转换为字符串格式,便于后续业务逻辑处理。 - 同时保留 Kafka 消息的元数据信息(如主题名称
topic
、分区号partition
、偏移量offset
)。
- 消息的
-
扩展 Flink 的 Kafka 数据处理能力:
- 默认的反序列化器只处理消息内容(
key
或value
),而该自定义类将消息的元数据(如topic
和partition
)也作为输出的一部分,为复杂业务需求提供了更多上下文信息。
- 默认的反序列化器只处理消息内容(
-
控制流数据的结束逻辑:
- 实现了
isEndOfStream
方法,返回false
,表示 Kafka 的数据流是持续的,Flink 不会主动终止数据消费。
- 实现了
-
定义 Flink 数据类型:
- 使用
getProducedType
方法,明确告诉 Flink 输出的数据类型是ConsumerRecord<String, String>
,便于 Flink 在运行时正确处理流数据。
- 使用
package com.example.kafka_flink.util;import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.nio.charset.StandardCharsets;/*** @author wangjian*/
public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {// 是否表示流的最后一条元素// 返回 false,表示数据流会源源不断地到来,Flink 不会主动停止消费@Overridepublic boolean isEndOfStream(ConsumerRecord<String, String> stringStringConsumerRecord) {return false;}// 反序列化方法// 将 Kafka 消息从字节数组转换为 ConsumerRecord<String, String> 类型的数据// 返回的数据不仅包括消息内容(key 和 value),还包括 topic、offset 和 partition 等元数据信息@Overridepublic ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {// 检查 key 和 value 是否为 null,避免空指针异常String key = consumerRecord.key() == null ? null : new String(consumerRecord.key(), StandardCharsets.UTF_8);String value = consumerRecord.value() == null ? null : new String(consumerRecord.value(), StandardCharsets.UTF_8);// 构造并返回一个 ConsumerRecord 对象,其中包含反序列化后的 key 和 value,以及其他元数据信息return new ConsumerRecord<>(// Kafka 主题名称consumerRecord.topic(),// 分区号consumerRecord.partition(),// 消息偏移量consumerRecord.offset(),// 消息的 keykey,// 消息的 valuevalue);}// 指定数据的输出类型// 告诉 Flink 消费的 Kafka 数据类型是 ConsumerRecord<String, String>@Overridepublic TypeInformation<ConsumerRecord<String, String>> getProducedType() {return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {});}
}
2.5 消费到消息的结果
相关文章:
Flink链接Kafka
一、基于 Flink 的 Kafka 消息生产者 Kafka 生产者的创建与配置: 代码通过 FlinkKafkaProducer 创建 Kafka 生产者,用于向 Kafka 主题发送消息。Flink 执行环境的配置: 配置了 Flink 的检查点机制,确保消息的可靠性,支…...
Maven 配置本地仓库
步骤 1:修改 Maven 的 settings.xml 文件 找到你的 Maven 配置文件 settings.xml。 Windows: C:\Users\<你的用户名>\.m2\settings.xmlLinux/macOS: ~/.m2/settings.xml 打开 settings.xml 文件,找到 <localRepository> 标签。如果没有该标…...
【PHP】双方接口通信校验服务
请求方 使用 ApiAuthService::buildUrl($domain, [terminal => 1, ts => time()]); //http://域名/adminapi/login/platformLogin?sign=F7FE8A150DEC18BE8A71C5059742C81A&terminal=1&ts=1736904841接收方 $getParams = $this->request->get();$validate…...
mac 安装docker
1、下载docker 进入 /Applications/Docker.app/Contents/MacOS/Docker Desktop.app/Contents/Resources目录 把app.asar 文件备份 将下载的中文包复制进去。修改成一样的名字 [汉化包下载地址](https://github.com/asxez/DockerDesktop-CN)...
ANSYS Fluent学习笔记(七)求解器四部分
16.亚松弛因子 Controls面板里面设置,它能够稳定计算的过程。如果采用常规的迭代算法可能结果就会发生振荡的情况。采用亚松驰因子可以有助于残差的稳定。 他的取值范围是0-1,0代表没有亚松驰,1表示物理量变化很快,一般情况下取…...
【微服务】面试 3、 服务监控 SkyWalking
微服务监控的原因 问题定位:在微服务架构中,客户端(如 PC 端、APP 端、小程序等)请求后台服务需经过网关再路由到各个微服务,服务间可能存在多链路调用。当某一微服务挂掉时,在复杂的调用链路中难以迅速确定…...
llamafactory使用8张昇腾910b算力卡lora微调训练qwen2-72b大模型
说明 我需要在昇腾服务器上对Qwen2-72B大模型进行lora微调,改变其自我认知。 我的环境下是8张910B1卡。显存约512GB。 准备:安装llamafactory 请参考官方方法安装llamafactory:https://github.com/hiyouga/LLaMA-Factory 特别强调下&…...
在服务器上增加新网段IP的路由配置
在服务器上增加新网段IP的路由配置 前提条件步骤一:检查当前路由表步骤二:添加新路由步骤三:验证新路由步骤四:持久化路由配置脚本示例结论在网络管理中,路由配置是一项基本且重要的任务。它决定了数据包在网络中的传输路径。本文将详细介绍如何在服务器上增加新的路由配置…...
2Spark Core
2Spark Core 1.RDD 详解1) 为什么要有 RDD?2) RDD 是什么?3) RDD 主要属性 2.RDD-API1) RDD 的创建方式2) RDD 的算子分类3) Transformation 转换算子4) Action 动作算子 3. RDD 的持久化/缓存4. RDD 容错机制 Checkpoint5. RDD 依赖关系1) 宽窄依赖2) 为什么要设计宽窄依赖 …...
【ANGULAR网站开发】初始环境搭建(SpringBoot)
1. 初始化SpringBoot 1.1 创建SpringBoot项目 清理spring-boot-starter-test,有需要的可以留着 1.2 application.properties 将application.properties改为yaml,个人习惯问题,顺便设置端口8888,和前端设置的一样 server:por…...
Vue 页面布局组件-Vuetify、Semantic
在现代 Web 开发中,用户体验是关键,尤其是当我们利用 Vue.js 框架构建用户友好的界面时。今天,我们将深入探讨如何使用 Vuetify 和 Semantic UI 来创建高效、美观的页面布局组件。通过这项技术,你将能够为用户呈现一个流畅的交互体…...
小程序组件 —— 31 事件系统 - 事件绑定和事件对象
小程序中绑定事件和网页开发中绑定事件几乎一致,只不过在小程序不能通过 on 的方式绑定事件,也没有 click 等事件,小程序中绑定事件使用 bind 方法,click 事件也需要使用 tap 事件来进行代替,绑定事件的方式有两种&…...
23种设计模式
23种设计模式 创建型模式(Creational Patterns)结构型模式(Structural Patterns)行为型模式(Behavioral Patterns)总结 Java中的设计模式是解决特定问题的通用、可复用的解决方案。它们不是完成代码&#x…...
SIBR详细介绍:基于图像的渲染系统及3DGS实例展示【3DGS实验复现】
文章目录 什么是 SIBR?IBR 技术的优势SIBR 的核心组件SIBR 的应用场景如何使用 SIBR?3D Gaussian Splatting 实验实例展示1. 什么是 3D Gaussian Splatting (3DGS)?2. 实验运行环境步骤:简要说明如何使用 3DGS 的两种渲染方式 3. …...
每天五分钟深度学习框架pytorch:基于vgg块搭建VGG卷积神经网络
本文重点 前面我们使用pytorch搭建了vgg块,本文我们使用vgg块搭建卷积神经网络VGG16,我们先来看一下vgg16的模型结构是什么样的: 搭建vgg16 import torch from torch import nn def vgg_block(num_convs,in_channels,out_channels): net=[nn.Conv2d(in_channels,out_channe…...
【gin】中间件使用之jwt身份认证和Cors跨域,go案例
Gin-3 中间件编程及 JWT 身份认证 1. Gin 中间件概述 中间件是处理 HTTP 请求的函数,可以在请求到达路由处理函数之前或之后对请求进行处理。 在 Gin 框架中,中间件常用于处理日志记录、身份验证、权限控制等功能。 router : gin.Default() router.Us…...
探索 Vue.js 组件开发的新边界:动态表单生成技术
随着前端技术的飞速发展,Vue.js 作为一款灵活、易用且性能优异的框架,一直是开发者心中的不二之选。本文将深入介绍 Vue.js 组件开发中的最新技术之一:动态表单生成技术,并通过具体实例展示如何实现这一高效技术。 为什么选择动态…...
Android 调用系统服务接口获取屏幕投影(需要android.uid.system)
媒体投影 借助 Android 5(API 级别 21)中引入的 android.media.projection API,您可以将设备屏幕中的内容截取为可播放、录制或投屏到其他设备(如电视)的媒体流。 Android 14(API 级别 34)引入…...
Node.js - Express框架
1. 介绍 Express 是一个基于 Node.js 的 Web 应用程序框架,主要用于快速、简便地构建 Web 应用程序 和 API。它是目前最流行的 Node.js Web 框架之一,具有轻量级、灵活和功能丰富的特点。 核心概念包括路由,中间件,请求与响应&a…...
Picocli 命令行框架
官方文档 https://picocli.info/ 官方提供的快速入门教程 https://picocli.info/quick-guide.html 使用 Picocli 创建命令行应用程序 Picocli 是一个用于构建 Java 命令行应用的强大框架,它简化了参数解析和帮助消息生成的过程。 下面是如何使用 Picocli 构建简单命…...
Vscode——SSH连接不上的一种解决办法
一、完整报错: > @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ > IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY! > Someone could be eavesdropping on you right now (man-in-the...
Stream流
一 : Stream流的介绍 stream不存储数据,而是按照特定的规则对数据进行计算,一般会输出结果; stream不会改变数据源,通常情况下会产生一个新的集合; stream具有延迟执行特性,只有调用终端操作时ÿ…...
开源文件存储分享平台Seafile部署与应用
Seafile 是一款开源的企业云盘,注重可靠性和性能,支持全平台客户端。Seafile 内置协同文档 SeaDoc ,让协作撰写、管理和发布文档更便捷。适用于团队协作、文件存储和同步的开源解决方案,它提供了可靠、安全和易用的云存储服务。主要有以下特点: 文件存储和同步:Seafile 允…...
RAG技术:是将知识库的文档和问题共同输入到LLM中
RAG技术 RAG技术是将知识库的文档和问题共同输入到LLM中 RAG技术是先从知识库中检索出与问题相关的文档片段,然后将这些检索到的文档片段与问题一起输入到LLM中进行回答。具体过程如下: 文本分块 由于LLM的上下文窗口有限,需要将长文本资料分割成较小的块,以便LLM能够有…...
战略与规划方法——深入解析波士顿矩阵(BCG Matrix):分析产品组合的关键工具
深入解析波士顿矩阵(BCG Matrix):分析产品组合的关键工具 在现代商业管理中,合理地分析和管理产品组合对于企业的成功至关重要。波士顿矩阵(BCG Matrix),又称为成长份额矩阵,是一种由波士顿咨询集团(Boston Consulting Group)在20世纪70年代提出的战略工具,用于帮助…...
GORM(Go语言数据交互库)
GORM(Go ORM,即对象关系映射)是Go语言中非常流行且功能强大的数据库交互库。它简化了与关系型数据库的交互过程,提供了丰富的API来处理各种数据库操作。下面将详细介绍GORM的功能、使用方法和一些高级特性。 1. 安装 首先&#…...
Spring Boot教程之五十七:在 Apache Kafka 上发布 JSON 消息
Spring Boot | 如何在 Apache Kafka 上发布 JSON 消息 Apache Kafka是一个发布-订阅消息系统。消息队列允许您在进程、应用程序和服务器之间发送消息。在本文中,我们将了解如何在 Spring Boot 应用程序中向 Apache Kafka 发送 JSON 消息。 为了了解如何创建 Spring…...
开发指南091-延迟退休算法
公布平台上人力资源系统有关延迟退休算法: package org.qlm.util;public class busiUtil {/*birthYearMonth 出生年月 yyyy-MMmode 0 男职工 1 女干部 2 女职工*/public static String calculateRetirementDate(String birthYearMonth, String mode){if ("0&…...
Flask-SQLAlchemy 基于一个base表 - 动态创建使用相同字段的其他业务表
1 安装 首先,确保您安装了 Flask 和 SQLAlchemy,以及 MySQL 的驱动程序(例如 mysql-connector-python 或 PyMySQL): pip install Flask Flask-SQLAlchemy mysql-connector-python2 创建项目结构 创建一个简单的项目…...
数据结构--二叉树
目录 有序二叉树: 平衡二叉树: 234树: 红黑树 红黑树特点: 为什么红黑树是最优二叉树? 哈夫曼树和哈夫曼编码 有序二叉树: 平衡二叉树: 在有序二叉树的基础上得来的,且左右子…...
【编译构建】用cmake编译libjpeg动态库并实现转灰度图片
先编译出libjepg动态库 1、下载libjpeg源码: https://github.com/libjpeg-turbo/libjpeg-turbo 2、编译出动态库或静态库 写一个编译脚本,用cmake构建。 #!/bin/bash# 定义变量 SOURCE_DIR"/home/user/libjpeg-turbo-main" BUILD_DIR"${SOURCE_…...
vLLM私有化部署大语言模型LLM
目录 一、vLLM介绍 二、安装vLLM 1、安装环境 2、安装步骤 三、运行vLLM 1、运行方式 2、切换模型下载源 3、运行本地已下载模型 四、通过http访问vLLM 一、vLLM介绍 vLLM(官方网址:https://www.vllm.ai)是一种用于大规模语言模型&#x…...
人工智能任务19-基于BERT、ELMO模型对诈骗信息文本进行识别与应用
大家好,我是微学AI,今天给大家介绍一下人工智能任务19-基于BERT、ELMO模型对诈骗信息文本进行识别与应用。近日,演员王星因接到一份看似来自知名公司的拍戏邀约,被骗至泰国并最终被带到缅甸。这一事件迅速引发了社会的广泛关注。该…...
ESP-IDF学习记录(5) 画一块esp32-c3 PCB板
最近看了半个多月,趁着嘉立创官方活动,研究esp32-c3规格书,白嫖PCB 和元器件。原本计划按照官方推荐的搞个四层板,结果打样太贵,火速改成双层板,用了官方的券。小于10*10,也可以使用嘉立创的免费打样。 下面…...
Day04-后端Web基础——Maven基础
目录 Maven课程内容1. Maven初识1.1 什么是Maven?1.2 Maven的作用1.2.1 依赖管理1.2.2 项目构建1.2.3 统一项目结构 2. Maven概述2.1 Maven介绍2.2 Maven模型2.2.1 构建生命周期/阶段(Build lifecycle & phases)2.2.2 项目对象模型 (Project Object Model)2.2.3 依赖管理模…...
ASP.NET Core - 日志记录系统(一)
ASP.NET Core - 日志记录系统(一) 一、日志记录二、ASP.Net Core 的日志记录2.1. 日志记录系统的接入2.2 记录日志2.3 基本配置2.3.1 日志级别2.3.2 全局输出配置2.3.3 针对特定日志提供程序的配置2.3.6 显式设置2.3.4 配置筛选原理2.3.5 日志作用域 一、…...
Linux 各个服务启动命令
目录 redis后台启动rocketMq后台启动mongodb后台启动mysql后台启动 redis后台启动 ./redis-server ./redis.confrocketMq后台启动 #关闭Nameserver sh bin/mqshutdown namesrv #关闭Broker sh bin/mqshutdown broker #启动namesrv nohup sh bin/mqnamesrv -n 127.0.0.1:9876 …...
24-25-1-单片机开卷部分习题和评分标准
依据相关规定试卷必须按评分标准进行批改。 给分一定是宽松的,能给分一定给,如有疑问也可以向学院教务办申请查卷。 一部分学生期末成绩由于紧张或其他原因导致分数过低,也是非常非常遗憾的。 个人也是非常抱歉的。 开卷考试 简答题 第一…...
Apache Hop从入门到精通 第二课 Apache Hop 核心概念/术语
1、apache hop核心概念思维导图 虽然apache hop是kettle的一个分支,但是它的概念和kettle还是有一些区别的,下图是我根据官方文档梳理的appache hop的核心概念思维导图。 2、Tools(工具) 1)Hop Conf Hop Conf 是一个…...
网络安全 | Web安全常见漏洞和防护经验策略
关注:CodingTechWork 引言 OWASP (Open Web Application Security Project) Top 10是Web应用最常见的安全风险集合,帮助开发人员和安全专家识别和防止最严重的网络安全问题。以下是基于OWASP Top 10的Web安全防护经验策略与规则集。Web开发者必须对潜在…...
Unity 3D游戏开发从入门进阶到高级
本文精心整理了Unity3D游戏开发相关的学习资料,涵盖入门、进阶、性能优化、面试和书籍等多个维度,旨在为Unity开发者提供全方位、高含金量的学习指南.欢迎收藏。 学习社区 Unity3D开发者 这是一个专注于Unity引擎的开发者社区,汇聚了众多Un…...
浅谈云计算14 | 云存储技术
云存储技术 一、云计算网络存储技术基础1.1 网络存储的基本概念1.2云存储系统结构模型1.1.1 存储层1.1.2 基础管理层1.1.3 应用接口层1.1.4 访问层 1.2 网络存储技术分类 二、云计算网络存储技术特点2.1 超大规模与高可扩展性2.1.1 存储规模优势2.1.2 动态扩展机制 2.2 高可用性…...
No.1|Godot|俄罗斯方块复刻|棋盘和初始方块的设置
删掉基础图标新建assets、scenes、scripts文件夹 俄罗斯方块的每种方块都是由四个小方块组成的,很适合放在网格地图中 比如网格地图是宽10列,高20行 要实现网格的对齐和下落 Node2D节点 新建一个Node2D 添加2个TileMapLayer 一个命名为Board&…...
二 RK3568 固件中打开 ADB 调试
一 usb adb Android 系统,设置->开发者选项->已连接到计算机 打开,usb调试开关打开 通过 usb otg 口连接 开发上位机 (windows/linux) 上位机安装 adb 服务之后 , 通过 cmd/shell: #1 枚举设备 adb devices #2 进入 android shell adb shell # 3 验证上传下载…...
鸿蒙报错Init keystore failed: keystore password was incorrect
报错如下: > hvigor ERROR: Failed :entry:defaultSignHap... > hvigor ERROR: Tools execution failed. 01-13 16:35:55 ERROR - hap-sign-tool: error: Init keystore failed: keystore password was incorrect * Try the following: > The key stor…...
Java学习笔记(二十三)
1 CacheEvict CacheEvict是Spring框架中用于清空缓存的注解。以下是对CacheEvict注解的详细介绍: 1.1 作用 CacheEvict注解的主要作用是删除缓存中的数据。在方法执行后或执行前(根据配置),它可以清空指定的缓存项或整个缓存区…...
TIOBE编程语言排行靠前的编程语言的吉祥物
Python的吉祥物:小蟒蛇 Python语言的吉祥物是一只名叫"Pythonidae"(或简称"Py")的小蟒蛇。这个吉祥物由Tobias Kohn设计于2005年,它的形象借鉴了真实的蟒蛇,但加入了一些可爱和友善的特点。小蟒蛇…...
Redis集群部署详解:主从复制、Sentinel哨兵模式与Cluster集群的工作原理与配置
集群部署形式 1、主从复制1.1 工作机制1.2 配置实现1.3 优缺点1.4 部署形式1.5 主从复制优化 2、Sentinel 哨兵模式2.1 工作机制2.2 配置实现2.3 优缺点2.4 哨兵机制选举流程2.5 脑裂问题解决方案 3、Redis Cluster3.1 工作机制3.2 配置实现3.3 优缺点3.4 故障转移3.5 哈希槽为…...
Dubbo泛化调用
本文记录下利用dubbo泛化调用实现网关server收http请求,然后转发给dubbo服务,然后收到dubbo响应的功能原理。 关键点1:dubbo泛化调用。可根据(注册中心地址、接口名,方法名,参数类型)唯一确定一个dubbo服务…...
SpringBoot工程快速启动
1.问题导入 以后我们和前端开发人员协同开发,而前端开发人员需要测试前端程序就需要后端开启服务器,这就受制于后端开发人员。 为了摆脱这个受制,前端开发人员尝试着在自己电脑上安装 Tomcat 和 Idea ,在自己电脑上启动后端程序&a…...