Kafka-Connect源码分析
一、上下文
《Kafka-Connect自带示例》中我们尝试了零配置启动producer和consumer去生产和消费数据,那么它内部是如何实现的呢?下面我们从源码来揭开它神秘的面纱。
二、入口类有哪些?
从启动脚本(connect-standalone.sh)中我们可以获取到启动类为ConnectStandalone
# ......省略......exec $(dirname $0)/kafka-run-class.sh $EXTRA_ARGS org.apache.kafka.connect.cli.ConnectStandalone "$@"
此外我们在参数中还给了source和sink的配置文件,从这source文件中可以获取到入口类为:FileStreamSource,从sink文件中可以获取到入口类为:FileStreamSink
FileStreamSource对应源码中的FileStreamSourceConnector
FileStreamSink对应源码中的FileStreamSinkConnector
connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/connect-file-test-data/source.txt
topic=connect-test
connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/connect-file-test-data/sink.txt
topics=connect-test
三、ConnectStandalone
它将Kafka Connect作为独立进程运行。在此模式下,work(连接器和任务)不会被分配。相反,所有正常的Connect机器都在一个过程中工作。适合一些临时、小型或实验性工作。
在此模式下,连接器和任务配置存储在内存中,不是持久的。但是,连接器偏移数据是持久的,因为它使用文件存储(可通过offset.storage.file.filename配置)
public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {public static void main(String[] args) {ConnectStandalone connectStandalone = new ConnectStandalone(args);//调用父类的run()connectStandalone.run();}}//这里用到了AbstractConnectCli,它里面有Kafka Connect的通用初始化逻辑。
public abstract class AbstractConnectCli<T extends WorkerConfig> {//验证第一个CLI参数、进程工作器属性,并启动Connectpublic void run() {if (args.length < 1 || Arrays.asList(args).contains("--help")) {log.info("Usage: {}", usage());Exit.exit(1);}try {//connect-standalone.properties 配置文件String workerPropsFile = args[0];//从connect-standalone.properties 中将配置的参数都转成 mapMap<String, String> workerProps = !workerPropsFile.isEmpty() ?Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();//这里面存放了 connect-standalone.properties 后的其他参数//比如connect-file-source.properties、connect-file-sink.propertiesString[] extraArgs = Arrays.copyOfRange(args, 1, args.length);//启动ConnectConnect connect = startConnect(workerProps, extraArgs);// 关机将由Ctrl-C或通过HTTP关机请求触发connect.awaitStop();} catch (Throwable t) {log.error("Stopping due to error", t);Exit.exit(2);}}}
总结下来做了两件事情
1、初始化connect-standalone.properties配置信息
2、启动Connect
那什么是Connect,且它有做了什么呢?下面我们来看下
1、启动Connect
public abstract class AbstractConnectCli<T extends WorkerConfig> {public Connect startConnect(Map<String, String> workerProps, String... extraArgs) {//Kafka Connect工作进程初始化log.info("Kafka Connect worker initializing ...");long initStart = time.hiResClockMs();WorkerInfo initInfo = new WorkerInfo();initInfo.logAll();//正在扫描插件类。这可能需要一点时间log.info("Scanning for plugin classes. This might take a moment ...");Plugins plugins = new Plugins(workerProps);plugins.compareAndSwapWithDelegatingLoader();T config = createConfig(workerProps);log.debug("Kafka cluster ID: {}", config.kafkaClusterId());RestClient restClient = new RestClient(config);ConnectRestServer restServer = new ConnectRestServer(config.rebalanceTimeout(), restClient, config.originals());restServer.initializeServer();URI advertisedUrl = restServer.advertisedUrl();String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();//connector.client.config.override.policy 默认值 All//ConnectorClientConfigOverridePolicy 实现的类名或别名。// 定义连接器可以覆盖哪些客户端配置。默认实现为“All”,这意味着连接器配置可以覆盖所有客户端属性。// 框架中的其他可能策略包括“无”禁止连接器覆盖客户端属性,“主体”允许连接器仅覆盖客户端主体ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),config, ConnectorClientConfigOverridePolicy.class);Herder herder = createHerder(config, workerId, plugins, connectorClientConfigOverridePolicy, restServer, restClient);final Connect connect = new Connect(herder, restServer);//Kafka Connect工作进程初始化已完成log.info("Kafka Connect worker initialization took {}ms", time.hiResClockMs() - initStart);try {connect.start();} catch (Exception e) {log.error("Failed to start Connect", e);connect.stop();Exit.exit(3);}//这里面会依次处理 connect-file-source.properties、connect-file-sink.properties 等参数processExtraArgs(herder, connect, extraArgs);return connect;}}
可以理解connect-standalone.properties是用于启动connect的,connect-file-source.properties是用于启动source任务的的,connect-file-sink.properties是用于启动sink任务的,source、sink的任务是在processExtraArgs(herder, connect, extraArgs)中完成的,这里用到了三个参数,
1、Herder:牧民,可用于在Connect群集上执行操作的实例,它里面有Worker
2、Connect:这个类将Kafka Connect进程的所有组件(牧民、工人、存储、命令接口)联系在一起,管理它们的生命周期
3、extraArgs:用于配置source、sink任务的参数
2、启动source、sink任务
这里我们对着参数(connect-file-source.properties、connect-file-sink.properties)来看更为形象
protected void processExtraArgs(Herder herder, Connect connect, String[] extraArgs) {try {//一个配置一个配置去解析//按照官方的例子会依次解析connect-file-source.properties、connect-file-sink.propertiesfor (final String connectorConfigFile : extraArgs) {CreateConnectorRequest createConnectorRequest = parseConnectorConfigurationFile(connectorConfigFile);FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {if (error != null)log.error("Failed to create connector for {}", connectorConfigFile);elselog.info("Created connector {}", info.result().name());});//依次把每个配置文件的解析结果放入 herder 牧民中herder.putConnectorConfig(createConnectorRequest.name(), createConnectorRequest.config(),createConnectorRequest.initialTargetState(),false, cb);//Future的get方法是一个阻塞方法,用于获取任务的运行结果。当调用get方法时,如果任务尚未完成,线程会阻塞,直到任务完成。cb.get();}} catch (Throwable t) {//.....}}
下面我们分别用connect-file-source.properties、connect-file-sink.properties带入看看牧民(herder)是如何将任务进行执行的。一份文件就是一个connector,这里我们先分析StandaloneHerder
public class StandaloneHerder extends AbstractHerder {private final ScheduledExecutorService requestExecutorService;StandaloneHerder(Worker worker,String workerId,String kafkaClusterId,StatusBackingStore statusBackingStore,MemoryConfigBackingStore configBackingStore,ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,Time time) {super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy, time);this.configState = ClusterConfigState.EMPTY;//创建一个单线程执行器,可以安排命令在给定延迟后运行,或定期执行。this.requestExecutorService = Executors.newSingleThreadScheduledExecutor();configBackingStore.setUpdateListener(new ConfigUpdateListener());}public void putConnectorConfig(final String connName, final Map<String, String> config, final TargetState targetState,final boolean allowReplace, final Callback<Created<ConnectorInfo>> callback) {try {validateConnectorConfig(config, (error, configInfos) -> {if (error != null) {callback.onCompletion(error, null);return;}//向执行器提交 source 或 sink 的 ConnectorConfigrequestExecutorService.submit(() -> putConnectorConfig(connName, config, targetState, allowReplace, callback, configInfos));});} catch (Throwable t) {callback.onCompletion(t, null);}}private synchronized void putConnectorConfig(String connName,final Map<String, String> config,TargetState targetState,boolean allowReplace,final Callback<Created<ConnectorInfo>> callback,ConfigInfos configInfos) {try {//........//将此连接器配置(以及可选的目标状态)写入持久存储,并等待其被确认,//然后通过在Kafka日志中添加消费者来读回。如果将worker配置为使用fencable生产者写入配置topic,//则必须在调用此方法之前成功调用claimWritePrivileges()configBackingStore.putConnectorConfig(connName, config, targetState);startConnector(connName, (error, result) -> {if (error != null) {callback.onCompletion(error, null);return;}//执行器提交任务requestExecutorService.submit(() -> {updateConnectorTasks(connName);callback.onCompletion(null, new Created<>(created, createConnectorInfo(connName)));});});} catch (Throwable t) {callback.onCompletion(t, null);}}private synchronized void updateConnectorTasks(String connName) {//......//配置 connectorTask 这里会用到配置文件中的connector.class//既:List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);List<Map<String, String>> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs);if (taskConfigsChanged(configState, connName, rawTaskConfigs)) {removeConnectorTasks(connName);configBackingStore.putTaskConfigs(connName, rawTaskConfigs);createConnectorTasks(connName);}}private void createConnectorTasks(String connName) {List<ConnectorTaskId> taskIds = configState.tasks(connName);createConnectorTasks(connName, taskIds);}private void createConnectorTasks(String connName, Collection<ConnectorTaskId> taskIds) {Map<String, String> connConfigs = configState.connectorConfig(connName);for (ConnectorTaskId taskId : taskIds) {//依次启动每个task(配置的 source和sink task)startTask(taskId, connConfigs);}}private boolean startTask(ConnectorTaskId taskId, Map<String, String> connProps) {switch (connectorType(connProps)) {case SINK:return worker.startSinkTask(taskId,configState,connProps,configState.taskConfig(taskId),this,configState.targetState(taskId.connector()));case SOURCE:return worker.startSourceTask(taskId,configState,connProps,configState.taskConfig(taskId),this,configState.targetState(taskId.connector()));default:throw new ConnectException("Failed to start task " + taskId + " since it is not a recognizable type (source or sink)");}}}
牧民(Herder)会用Worker来启动配置的SouceTask和SinkTask,最终他们调用的还是同一个方法,只是任务构建器不同而已,下面我们继续分析
Worker启动SouceTask
public boolean startSourceTask(...) {return startTask(id, connProps, taskProps, configState, statusListener,new SourceTaskBuilder(id, configState, statusListener, initialState));}
Worker启动SinkTask
public boolean startSinkTask(...) {return startTask(id, connProps, taskProps, configState, statusListener,new SinkTaskBuilder(id, configState, statusListener, initialState));}
下面我们共同来分析startTask(...)
//线程池//Executors.newCachedThreadPool()private final ExecutorService executor;private boolean startTask(...){//......//从 connector.class 获取类进行加载String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);ClassLoader connectorLoader = plugins.connectorLoader(connType);//......final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);//对应的sourceTask 和 sinkTaskfinal Task task = plugins.newTask(taskClass);//此处就是根据传的参数来workerTask 会加载不同的Task// SourceTaskBuilder ---> SouceTask// SinkTaskBuilder ---> SinkTaskworkerTask = taskBuilder.withTask(task).withConnectorConfig(connConfig).withKeyConverter(keyConverter).withValueConverter(valueConverter).withHeaderConverter(headerConverter).withClassloader(connectorLoader).build();workerTask.initialize(taskConfig);WorkerTask<?, ?> existing = tasks.putIfAbsent(id, workerTask);//我们继续往下分析,看看 SourceTask 和 SinkTask 都是怎么执行的executor.submit(plugins.withClassLoader(connectorLoader, workerTask));if (workerTask instanceof WorkerSourceTask) {SourceTask 有一个单独 定时提交 offset 的 任务,默认间隔为 1minsourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, (WorkerSourceTask) workerTask));}return true;}
FileStreamSource对应的Task为:FileStreamSourceTask
FileStreamSink对应的Task为:FileStreamSinkTask
Worker会将WorkerTask调起去生产和消费数据
3、调度运行WorkerTask
WorkerTask会提供Worker用于管理任务的基本方法。实现将用户指定的Task与Kafka相结合,以创建数据流。且WorkerTask会放到线程池中进行调度,下面我们看下它的run()
//以下只是主要代码
abstract class WorkerTask<T, R extends ConnectRecord<R>> implements Runnable {public void run() {doRun();}private void doRun() throws InterruptedException {//会初始化 我们在connect-file-source.properties、connect-file-sink.properties中对Producer、Consumer的参数配置doStart();//真的开始// 用对应的SourceTask去读取数据,并交由Producer去生产// 用Consumer接收数据交由 SinkTask去处理数据execute();}}
doStart()
void doStart() {retryWithToleranceOperator.reporters(errorReportersSupplier.get());initializeAndStart();statusListener.onStartup(id);}
Source 和 Sink 会对initializeAndStart()有不同的实现
Source
这里用的是WorkerTask的子类:AbstractWorkerSourceTask
public abstract class AbstractWorkerSourceTask extends WorkerTask<SourceRecord, SourceRecord> {protected void initializeAndStart() {prepareToInitializeTask();offsetStore.start();//启动标记设置为 truestarted = true;//使用指定的上下文对象初始化此SourceTask。task.initialize(sourceTaskContext);//启动任务。这应该处理任何配置解析和任务的一次性设置。//这里会实际调用 FileStreamSourceTask 或者我们指定的其他 SourceTask的 start()task.start(taskConfig);log.info("{} Source task finished initialization and start", this);}}
public class FileStreamSourceTask extends SourceTask {public void start(Map<String, String> props) {AbstractConfig config = new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, props);//name=local-file-source//connector.class=FileStreamSource//tasks.max=1//file=/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/connect-file-test-data/source.txt//topic=connect-test// filefilename = config.getString(FileStreamSourceConnector.FILE_CONFIG);if (filename == null || filename.isEmpty()) {stream = System.in;//跟踪stdin的偏移量没有意义streamOffset = null;reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));}//topic=connect-testtopic = config.getString(FileStreamSourceConnector.TOPIC_CONFIG);//batch.sizebatchSize = config.getInt(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG);}}
Sink
这里用的是WorkerTask的子类:WorkerSinkTask
protected void initializeAndStart() {SinkConnectorConfig.validate(taskConfig);//订阅 topicif (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {List<String> topics = SinkConnectorConfig.parseTopicsList(taskConfig);consumer.subscribe(topics, new HandleRebalance());log.debug("{} Initializing and starting task for topics {}", this, String.join(", ", topics));} else {//topics.regex//根据正则设置的 要消费的 topicString topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);Pattern pattern = Pattern.compile(topicsRegexStr);consumer.subscribe(pattern, new HandleRebalance());log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr);}//初始化此任务的上下文。task.initialize(context);//启动任务。这应该处理任何配置解析和任务的一次性设置。//这里会真正的调用 FileStreamSinkTask 或者 其他我们配置的SinkTask 的 start()task.start(taskConfig);log.info("{} Sink task finished initialization and start", this);}
public class FileStreamSinkTask extends SinkTask {public void start(Map<String, String> props) {AbstractConfig config = new AbstractConfig(FileStreamSinkConnector.CONFIG_DEF, props);filename = config.getString(FileStreamSinkConnector.FILE_CONFIG);if (filename == null || filename.isEmpty()) {outputStream = System.out;} else {try {//根据我们在配置文件中结果文件 创建输出流outputStream = new PrintStream(Files.newOutputStream(Paths.get(filename), StandardOpenOption.CREATE, StandardOpenOption.APPEND),false,StandardCharsets.UTF_8.name());} catch (IOException e) {throw new ConnectException("Couldn't find or create file '" + filename + "' for FileStreamSinkTask", e);}}}}
execute()
为了看的清晰,这里我们只列举主要代码
Source
public abstract class AbstractWorkerSourceTask extends WorkerTask<SourceRecord, SourceRecord> {public void execute() {while (!isStopping()) {//这里会调用 task.poll(); 也就是从文件读取数据toSend = poll();//这里真的就会调用producer.send() 发送数据sendRecords()}}
}
Sink
class WorkerSinkTask extends WorkerTask<ConsumerRecord<byte[], byte[]>, SinkRecord> {public void execute() {while (!isStopping())iteration();}}protected void iteration() {poll(timeoutMs);}protected void poll(long timeoutMs) {//用consumer拉回数据 ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);//转化并交由 自己定义的 SinkTask 处理数据convertMessages(msgs);deliverMessages()}private ConsumerRecords<byte[], byte[]> pollConsumer(long timeoutMs) {ConsumerRecords<byte[], byte[]> msgs = consumer.poll(Duration.ofMillis(timeoutMs));}
}
相关文章:
Kafka-Connect源码分析
一、上下文 《Kafka-Connect自带示例》中我们尝试了零配置启动producer和consumer去生产和消费数据,那么它内部是如何实现的呢?下面我们从源码来揭开它神秘的面纱。 二、入口类有哪些? 从启动脚本(connect-standalone.sh&#…...
【STM32 Modbus编程】-作为主设备读取保持/输入寄存器
作为主设备读取保持/输入寄存器 文章目录 作为主设备读取保持/输入寄存器1、硬件准备与连接1.1 RS485模块介绍1.2 硬件配置与接线1.3 软件准备2、读保持寄存器2.1 主设备发送请求2.2 从设备响应请求2.3 主机接收数据3、读输入寄存器4、结果4.1 保持寄存器4.2 输入寄存器在前面的…...
Kubesphere上搭建redis集群
Kubesphere上搭建redis集群 版本:redis:6.2.3 1)挂载配置 redis.conf: cluster-enabled yes cluster-config-file nodes.conf cluster-node-timeout 5000 cluster-require-full-coverage no cluster-migration-barrier 1 appendonly yes …...
learn-(Uni-app)跨平台应用的框架
使用 Vue.js 开发所有前端应用的框架,开发者编写一份代码,可发布到iOS、Android、Web(包括微信小程序、百度小程序、支付宝小程序、字节跳动小程序、H5、App等)等多个平台。 跨平台:Uni-app 支持编译到iOS、Android、W…...
target_compile_definitions
这个接口给目标定义的宏,不能像C中定义的宏一样,尝试利用宏进行替换: cmake_minimum_required(VERSION 3.8) project(compile_definitions_pro)add_executable(main_exec src/main.cpp)set(SYSTEM_NAME "") if(CMAKE_SYSTEM_NAME S…...
浏览器同源策略、跨域、跨域请求,服务器处理没、跨域解决方案
目录 什么是同源策略什么是跨域发生跨域时,服务器有没有接到请求并处理响应:(两种情况) 如何解决跨域 什么是同源策略 概念: 同源策略是浏览器的一种安全机制,用于防止恶意网站对用户的敏感数据进行未经授…...
深入理解网络安全等级保护:保障信息安全的关键策略与实践
随着信息技术的飞速发展,网络安全问题日益凸显。为了应对这一挑战,网络安全等级保护制度应运而生,旨在确保不同等级的信息和信息系统的安全。本文将探讨网络安全等级保护的基本概念、重要性及其实践方法。 一、信息安全等级保护的基本概念 1…...
MySQL
InnoDB 引擎底层存储和缓存原理 到目前为止,MySQL 对于我们来说还是一个黑盒,我们只负责使用客户端发 送请求并等待服务器返回结果,表中的数据到底存到了哪里?以什么格式存放的? MySQL 是以什么方式来访问的这些数据&…...
新书速览|循序渐进Node.js企业级开发实践
《循序渐进Node.js企业级开发实践》 1 本书内容 《循序渐进Node.js企业级开发实践》结合作者多年一线开发实践,系统地介绍了Node.js技术栈及其在企业级开发中的应用。全书共分5部分,第1部分基础知识(第1~3章)…...
2024三掌柜赠书活动第三十五期:Redis 应用实例
目录 前言 Redis操作都会,却不知道怎么用? 关于《Redis 应用实例》 编辑推荐 内容简介 作者简介 图书目录 《Redis 应用实例》全书速览 拓展:Redis使用场景 实例1:缓存应用 场景描述 实现方法 具体代码示例 实例2&a…...
Android 第三方框架:RxJava:源码分析:观察者模式
文章目录 观察者模式RxJava中的观察者模式总结 观察者模式 RxJava中的观察者模式 以Observable、ObservableOnSubscribe、Observer为例 Observable是被观察者 负责发射事件或数据 Observer是观察器 负责对从被观察者中获取的数…...
开源模型应用落地-安全合规篇-用户输入价值观判断(四)
一、前言 在深度合规功能中,对用户输入内容的价值观判断具有重要意义。这一功能不仅仅是对信息合法性和合规性的简单审核,更是对信息背后隐含的伦理道德和社会责任的深刻洞察。通过对价值观的判断,系统能够识别可能引发不当影响或冲突的内容,从而为用户提供更安全、更和谐的…...
【js逆向专题】13.jsvmp补环境篇一
目录 一.了解jsvmp技术1. js虚拟机保护方案2.jsvmp实现原理3. 模拟jsvmp执行过程 二.环境检测1. 什么是环境检测2.案例讲解 三. 项目实战1. 案例11.逆向目标2. 项目分析1.补第一个referrer2. 调试技巧13. 调试技巧24. 补充sign5. 补 length6. 参数长短补充 3. 逆向结果 2. 案例…...
Java---每日小题
题目1-极大极小游戏 给你一个下标从 0 开始的整数数组 nums ,其长度是 2 的幂。 对 nums 执行下述算法: 设 n 等于 nums 的长度,如果 n 1 ,终止 算法过程。否则,创建 一个新的整数数组 newNums ,新数组长度…...
leetcode 23. 合并 K 个升序链表
给你一个链表数组,每个链表都已经按升序排列。 输入:lists [[1,4,5],[1,3,4],[2,6]] 输出:[1,1,2,3,4,4,5,6] 解释:链表数组如下: [1->4->5,1->3->4,2->6 ] 将它们合并到一个有序链表中得到。 1->…...
Windows 小记 6 -- 为什么我的全局消息钩子卸载不掉?
Hook dll 在其消息循环中被卸载。强制它们进入消息循环有助于卸载它们。在 UnhookWindowsHookEx 之后添加此代码以强制唤醒所有消息循环: DWORD dwResult; SendMessageTimeout(HWND_BROADCAST, WM_NULL, 0, 0, SMTO_ABORTIFHUNG|SMTO_NOTIMEOUTIFNOTHUNG, 1000, &a…...
Python+onlyoffice 实现在线word编辑
onlyoffice部署 version: "3" services:onlyoffice:image: onlyoffice/documentserver:7.5.1container_name: onlyofficerestart: alwaysenvironment:- JWT_ENABLEDfalse#- USE_UNAUTHORIZED_STORAGEtrue#- ONLYOFFICE_HTTPS_HSTS_ENABLEDfalseports:- "8080:8…...
LC低通滤波器Bode图分析(传递函数零极点)
LC低通滤波器 我们使得L4.7uH,C220uF;电感L的阻抗为Xl;电容C的阻抗为Xc; 传递函数 H ( s ) u o u i X C X C X L 1 s C 1 s C s L 1 1 s 2 L C (其中 s j ω ) H(s)\frac{u_{o} }{u_{i} } \frac{…...
【机器学习】机器学习的基本分类-无监督学习(Unsupervised Learning)
无监督学习(Unsupervised Learning) 无监督学习是一种机器学习方法,主要用于没有标签的数据集。其目标是从数据中挖掘出潜在的结构和模式。常见的无监督学习任务包括 聚类、降维、密度估计 和 异常检测。 1. 无监督学习的核心目标 1.1 聚类…...
六、docker compose单机容器编排工具
六、docker compose单机容器编排工具 6.1 compose简介 Compose是一个用于定义和运行多容器Docker应用程序的工具。您可以使用Compose文件来配置应用程序的服务,然后使用单个命令从配置中创建并启动所有服务。compose的配置文件示例如下 compose的github网址&#…...
Python3 operator 模块
Python2.x 版本中,使用 cmp() 函数来比较两个列表、数字或字符串等的大小关系。 Python 3.X 的版本中已经没有 cmp() 函数,如果你需要实现比较功能,需要引入 operator 模块,适合任何对象,包含的方法有: o…...
沪合共融 “汽”势如虹 | 昂辉科技参加合肥上海新能源汽车产业融合对接会
为积极响应制造业重点产业链高质量发展行动号召,促进合肥、上海两地新能源汽车产业链上下游企业融合对接、协同发展,共同打造长三角世界级新能源汽车产业集群,11月28日,合肥市工信局组织部分县区工信部门及全市30余户新能源汽车产…...
访问http网页强制跳转到了https的解决办法
目录 解决浏览器自动从 HTTP 重定向到 HTTPS 的问题问题原因:HSTS(HTTP Strict Transport Security)什么是 HSTS?HSTS 的工作原理 如何解决?1. 清除浏览器的 HSTS 信息在 Chrome 中清除 HSTS 信息:在 Firef…...
PDF处理的创新工具:福昕低代码平台尝鲜
在当今数字化时代,PDF文件的处理和管理变得越来越重要。福昕低代码平台是新发布的一款创新的工具,旨在简化PDF处理和管理的流程。通过这个平台,用户可以通过简单的拖拽界面上的按钮,轻松完成对Cloud API的调用工作流,而…...
EmoAva:首个大规模、高质量的文本到3D表情映射数据集。
2024-12-03,由哈尔滨工业大学(深圳)的计算机科学系联合澳门大学、新加坡南洋理工大学等机构创建了EmoAva数据集,这是首个大规模、高质量的文本到3D表情映射数据集,对于推动情感丰富的3D头像生成技术的发展具有重要意义…...
SpringMVC ——(1)
1.SpringMVC请求流程 1.1 SpringMVC请求处理流程分析 Spring MVC框架也是⼀个基于请求驱动的Web框架,并且使⽤了前端控制器模式(是⽤来提供⼀个集中的请求处理机制,所有的请求都将由⼀个单⼀的处理程序处理来进⾏设计,再根据请求…...
测试工具LoadRunner Professional脚本编写-脚本设置
勾选扩展日志-全选 原因:在并发完成后,通过抽查关键用户日志的方式,检查参数化是否如预期一致,比如抽查用户1(仓库一,物品一),用户11(仓库二,物品一),用户100(仓库十,物品十) 设置忽略思考时间 原因:是否忽略思考时间,请求数可能会有几十倍的差距…...
运用蓝光三维扫描仪的艺术与科技的完美融合-石膏头像模型3D扫描真实复刻
石膏头像具有独特的魅力,每一处细节都彰显着艺术之美。无论是深邃的眼神,还是精致的轮廓,都让人陶醉其中。 随着雕塑形式的日渐丰富,越来越多的新材料和新的塑造手法被运用到雕塑创作中,蓝光三维扫描技术的应用&#…...
文本域设置高度 加上文字限制并show出来:
文本域设置高度 :rows"4" 加上文字限制并show出来: maxlength"30" show-word-limit 效果: <el-form-item label"产品备注" prop"remark"><el-input v-model"form.remark" type"textarea"…...
探索数据确权、隐私保护、安全共享等方面的挑战与解决方案
在数据确权、隐私保护、安全共享等方面,当前确实面临着诸多挑战,同时也存在一些有效的解决方案。以下是对这些方面的详细探讨: 一、数据确权 挑战 权属关系模糊:由于数据具有复杂性和隐蔽性等特点,使得数据的权属关…...
麒麟 V10(ky10.x86_64)无网环境下 openssl - 3.2.2 与 openssh - 9.8p1 升级【最全教程】
目录 背景 安装包下载 上传解压安装包 安装zlib 安装OpenSSL 安装OpenSSH 验证 背景 近期,项目上线已进入倒计时阶段,然而在至关重要的安全检查环节中,却惊现现有的 OpenSSH 存在一系列令人担忧的漏洞: OpenSSH 资源管理错…...
前端技术(23) : 聊天页面
来源: GPT生成之后微调 效果图 HTML代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>聊天</t…...
ArcMap 处理河道坡度、计算污染区、三维爆炸功能
ArcMap 处理河道坡度、计算污染区、三维爆炸功能今天分析 一、计算河道方向坡度 1、折线转栅格 确定 2、提取河道高程值 确定后展示河流的高程值 3、计算坡向数据 确定后展示 4、计算坡度数据 确定后展示 二、计算上游集水区污染值 1、填挖处理 确定 2、计算流向 确定 3、计算…...
数据结构 (30)计算式查找法——哈希法
前言 数据结构中的计算式查找法,特别是哈希法(又称散列法、杂凑法、关键字地址计算法),是一种高效的查找技术。 一、哈希法的基本概念 哈希法是通过一个哈希函数将关键字映射到哈希表中的某个位置,从而实现快速查找的技…...
电子商务人工智能指南 4/6 - 内容理解
介绍 81% 的零售业高管表示, AI 至少在其组织中发挥了中等至完全的作用。然而,78% 的受访零售业高管表示,很难跟上不断发展的 AI 格局。 近年来,电子商务团队加快了适应新客户偏好和创造卓越数字购物体验的需求。采用 AI 不再是一…...
交易系统:线上交易系统流程详解
大家好,我是汤师爷~ 今天聊聊线上交易系统流程详解。 线上交易系统为新零售连锁商家提供一站式线上交易解决方案。其核心目标是,通过数字化手段扩大商家的服务范围,突破传统门店的地理限制。系统支持电商、O2O等多种业务形态,为…...
如何通过自学成长为一名后端开发工程师?
大家好,我是袁庭新。最近,有星友向我提出了一个很好的问题:如何通过自学成为一名后端开发工程师? 为了解答这个疑问,我特意制作了一个视频来详细分享我的看法和建议。 戳链接:如何通过自学成长为一名后端开…...
实际车辆行驶轨迹与预设路线偏离检测的Java实现
准备工作 本项目依赖于两个关键库:JTS Topology Suite(简称JTS),用于几何对象创建和空间分析;以及GeoTools,用于处理坐标转换和其他地理信息任务。确保开发环境中已经包含了这两个库,并且正确配…...
pci_resource相关函数
一、介绍 pci_resource_start函数用于获取PCI设备中指定Bar寄存器记录资源起始地址, 函数原型: resource_size_t pci_resource_start(struct pci_dev *dev, int bar) 参数: dev: PCI 设备结构体指针 bar: BAR 寄存器索引 (0-5) 返回&a…...
Android Studio 历史版本下载
Android Studio 历史版本下载 官方链接:https://developer.android.google.cn/studio/archive 通过gradle插件版本反查Android Studio历史版本 Android Studio Ladybug | 2024.2.1 October 1, 2024 【https://redirector.gvt1.com/edgedl/android/studio/ide-zip…...
Jupyter Lab打印日志
有时候在 jupyter 中执行运行时间较长的程序,且需要一直信息,但是程序执行到某些时候就不再打印了。 可以开启 日志控制台,将日志信息记录在控制台中。 参考:https://www.autodl.com/docs/jupyterlab/...
guava缓存的get方法的回调函数讲解一下
CacheBuilder.newBuilder()//设置缓存初始大小,应该合理设置,后续会扩容.initialCapacity(10)//最大值.maximumSize(100)//并发数设置.concurrencyLevel(5)//缓存过期时间,写入后10分钟过期.expireAfterWrite(600,TimeUnit.SECONDS)//统计缓存…...
【双分派小结】
双分派(Double Dispatch)是一种面向对象编程中的设计模式,通常用于实现多态性,尤其是在涉及多个对象交互时。它的基本思想是通过两个不同的对象来确定方法调用,而不仅仅是依赖于一个对象。 双分派的工作原理 在普通的…...
Python100道练习题
Python100道练习题 BIlibili 1、两数之和 num1 20 num2 22result num1 num2print(result)2、一百以内的偶数 list1 []for i in range(1,100):if i % 2 0:list1.append(i) print(list1)3、一百以内的奇数 # 方法一 list1 [] for i in range(1,100):if i % 2 ! 0:lis…...
Scala—Slice(提取子序列)方法详解
Scala—Slice(提取子序列)方法详解 在 Scala 中,slice 方法用于从集合中提取一个连续的子序列(切片)。可以应用于多种集合类型,如 List、Array、Seq 等。 一、slice 方法的定义 slice 根据提供的起始索引…...
nginx根据报文里字段转发至不同地址
nginx接收到post请求.请求报文里是一个json字符串,字符串里有个字段id。 根据id不同,转发到不同地址。 如果idaaa,转发到www.aaa.com.test 如果idbbb,转发到www.bbb.com.test 如何配置,请提供一个nginx.conf 要在 Nginx 中根据 POST 请求的 JSON 负载中的…...
Kafka单机及集群部署及基础命令
目录 一、 Kafka介绍1、kafka定义2、传统消息队列应用场景3、kafka特点和优势4、kafka角色介绍5、分区和副本的优势6、kafka 写入消息的流程 二、Kafka单机部署1、基础环境2、iptables -L -n配置3、下载并解压kafka部署包至/usr/local/目录4、修改server.properties5、修改/etc…...
TCP Robot Send Recive
Function main String data$ 定义字符串变量 SetNet #205, "192.168.0.1", 2004, CRLF, NONE, 0 设置端口号IP地址 OpenNet #205 As Server 端口号对应pc机的端口号 Print "等待201端口连接" WaitNet #201 等待201网…...
旅游管理系统|Java|SSM|VUE| 前后端分离
【重要1⃣️】前后端源码万字文档部署文档 【重要2⃣️】正版源码有问题包售后 【包含内容】 【一】项目提供非常完整的源码注释 【二】相关技术栈文档 【三】源码讲解视频 【其它服务】 【一】可以提供远程部署安装…...
qt-everywher交叉编译e-src-5.15.2
简化配置的方式: 你完全可以通过直接配置 安装目录、编译链 和 目标架构 来完成交叉编译,而不需要修改 mkspecs 配置。以下是如何通过简化配置来进行交叉编译 Qt 的步骤。 准备交叉编译工具链 首先,确保你已经安装了交叉编译工具链ÿ…...