【Flink CDC】Flink CDC的Schema Evolution表结构演变的源码分析和流程图
Flink CDC版本:3.2.1
说明:本文从SchemaOperator接收到,表结构变更事件开始,表结构变更事件应由source端产生,本文不讨论。
可以先看流程图,研究源码。
参考文章:
Flink cdc3.0动态变更表结构——源码解析
一、源码解析
以Sink to doris举例:
SchemaOperator
org.apache.flink.cdc.runtime.operators.schema.SchemaOperator
判断是否是SchemaChangeEvent
事件,调用processSchemaChangeEvents
方法
/** * This method is guaranteed to not be called concurrently with other methods of the operator. */
@Override
public void processElement(StreamRecord<Event> streamRecord) throws InterruptedException, TimeoutException, ExecutionException { Event event = streamRecord.getValue(); if (event instanceof SchemaChangeEvent) { // (0)processSchemaChangeEvents((SchemaChangeEvent) event); } else if (event instanceof DataChangeEvent) { // (13)processDataChangeEvents(streamRecord, (DataChangeEvent) event); } else { throw new RuntimeException("Unknown event type in Stream record: " + event); }
}
调用handleSchemaChangeEvent
方法:
private void processSchemaChangeEvents(SchemaChangeEvent event) throws InterruptedException, TimeoutException, ExecutionException { TableId tableId = event.tableId(); LOG.info( "{}> Table {} received SchemaChangeEvent {} and start to be blocked.", subTaskId, tableId, event); handleSchemaChangeEvent(tableId, event); // Update caches originalSchema.put(tableId, getLatestOriginalSchema(tableId)); schemaDivergesMap.put(tableId, checkSchemaDiverges(tableId)); List<TableId> optionalRoutedTable = getRoutedTables(tableId); if (!optionalRoutedTable.isEmpty()) { tableIdMappingCache .get(tableId) .forEach(routed -> evolvedSchema.put(routed, getLatestEvolvedSchema(routed))); } else { evolvedSchema.put(tableId, getLatestEvolvedSchema(tableId)); }
}
handleSchemaChangeEvent
调用requestSchemaChange
方法,请求修改Schema:
response.isAccepted()
就是注册中心接收了此修改需求。进入if后,重点来了:output.collect(new StreamRecord<>(new FlushEvent(tableId)));
。注意这里发送了一个new FlushEvent(tableId)
事件,这个事件会在SinkWriter
用到,就是通知SinkWriter
要执行flush
,即把数据刷入到sink端数据库,和jdbc的commit
相似。
FlushEvent内容非常简单只有tableId
但是其类型是FlushEvent
,此类的doc内容是:
- An {@link Event} from {@code SchemaOperator} to notify {@code DataSinkWriterOperator} that it
- start flushing.
也就是FlushEvent作为特殊数据传递事件,接收到此数据的DataSinkWriterOperator
会触发其执行flushing
操作,也就是将目前收到的所有数据都写入目标数据库。可以理解为:
schema修改后的数据 --> FlushEvent(新插入) --> schema修改前的数据
发送FlushEvent
事件后执行requestSchemaChangeResult
方法,此方法是while阻塞的方法,简而言之是等所有writer都完成了FlushEvent
前数据的(旧表结构的数据)写入前,一直阻塞不发送新表结构的数据至下游。
最后finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e)));
,简略的说其内部是handler方法中生成的SchemaRegistryRequestHandler#applySchemaChange
事件,将原始的SchemaChangeEvent转换成新的数据,还是根据Flink CDC的schema.change.behavior
转换,其类型如下:
![[image-20250106113512324.png]]
具体将这些时间发送至下游怎么用暂时没有研究了。
private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) throws InterruptedException, TimeoutException { if (schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION && schemaChangeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) { // CreateTableEvent should be applied even in EXCEPTION mode throw new RuntimeException( String.format( "Refused to apply schema change event %s in EXCEPTION mode.", schemaChangeEvent)); } // The request will block if another schema change event is being handled SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent); // (1)if (response.isAccepted()) { // (3)LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, tableId); output.collect(new StreamRecord<>(new FlushEvent(tableId))); // (4)List<SchemaChangeEvent> expectedSchemaChangeEvents = response.getSchemaChangeEvents(); schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size()); // The request will block until flushing finished in each sink writer SchemaChangeResultResponse schemaEvolveResponse = requestSchemaChangeResult(); // (5) List<SchemaChangeEvent> finishedSchemaChangeEvents = schemaEvolveResponse.getFinishedSchemaChangeEvents(); // Update evolved schema changes based on apply results finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e))); } else if (response.isDuplicate()) { LOG.info( "{}> Schema change event {} has been handled in another subTask already.", subTaskId, schemaChangeEvent); } else if (response.isIgnored()) { LOG.info( "{}> Schema change event {} has been ignored. No schema evolution needed.", subTaskId, schemaChangeEvent); } else { throw new IllegalStateException("Unexpected response status " + response); }
}
requestSchemaChange
是一个阻塞的方法(while (true)
),发送SchemaChangeRequest
直到返回的response
不是Busy
。可以看到发送的的SchemaChangeRequest
。
private SchemaChangeResponse requestSchemaChange( TableId tableId, SchemaChangeEvent schemaChangeEvent) throws InterruptedException, TimeoutException { long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis; while (true) { SchemaChangeResponse response = sendRequestToCoordinator( new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId)); if (response.isRegistryBusy()) { // (2)if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) { LOG.info( "{}> Schema Registry is busy now, waiting for next request...", subTaskId); Thread.sleep(1000); } else { throw new TimeoutException("TimeOut when requesting schema change"); } } else { return response; } }
}
sendRequestToCoordinator
方法是org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway
类的,也就Flink的内部类。
实习类有:
(1)org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway
(2)org.apache.flink.runtime.taskexecutor.rpc.RpcTaskOperatorEventGateway
内部具体逻辑暂不深入了解。
其实际发送至 SchemaRegistry#handleEventFromOperator
private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse> RESPONSE sendRequestToCoordinator(REQUEST request) { try { CompletableFuture<CoordinationResponse> responseFuture = toCoordinator.sendRequestToCoordinator( getOperatorID(), new SerializedValue<>(request)); return CoordinationResponseUtils.unwrap(responseFuture.get()); } catch (Exception e) { throw new IllegalStateException( "Failed to send request to coordinator: " + request.toString(), e); }
}
requestSchemaChangeResult
执行的操作非常简单,就是等待返回,如果跳出while方法结束,就代表sink端已经完成所有旧数据的flush,在此之前SchemaOperator
类不会向下游发送新数据,因为FlushEvent
后的数据都是schema变更的后的新数据了。
private SchemaChangeResultResponse requestSchemaChangeResult() throws InterruptedException, TimeoutException { CoordinationResponse coordinationResponse = sendRequestToCoordinator(new SchemaChangeResultRequest()); long nextRpcTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis; while (coordinationResponse instanceof SchemaChangeProcessingResponse) { // (6) (7)if (System.currentTimeMillis() < nextRpcTimeOutMillis) { Thread.sleep(1000); coordinationResponse = sendRequestToCoordinator(new SchemaChangeResultRequest()); } else { throw new TimeoutException("TimeOut when requesting release upstream"); } } return ((SchemaChangeResultResponse) coordinationResponse);
}
这里的toCoordinator.sendRequestToCoordinator
也是使用flink内部的调用过程,暂不做研究。
这个发送过程也是被org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry#handleCoordinationRequest
接收了,并在if (request instanceof SchemaChangeResultRequest)
内处理其逻辑。
private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse> RESPONSE sendRequestToCoordinator(REQUEST request) { try { CompletableFuture<CoordinationResponse> responseFuture = toCoordinator.sendRequestToCoordinator( getOperatorID(), new SerializedValue<>(request)); return CoordinationResponseUtils.unwrap(responseFuture.get()); } catch (Exception e) { throw new IllegalStateException( "Failed to send request to coordinator: " + request.toString(), e); }
}
SchemaRegistry
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry
toCoordinator.sendRequestToCoordinator
方法就由handleCoordinationRequest
接收,进入request instanceof SchemaChangeRequest
中的handleSchemaChangeRequest
方法。
@Override
public CompletableFuture<CoordinationResponse> handleCoordinationRequest( CoordinationRequest request) { CompletableFuture<CoordinationResponse> responseFuture = new CompletableFuture<>(); runInEventLoop( () -> { try { if (request instanceof SchemaChangeRequest) { SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request; requestHandler.handleSchemaChangeRequest( schemaChangeRequest, responseFuture); } else if (request instanceof SchemaChangeResultRequest) { requestHandler.getSchemaChangeResult(responseFuture); } else if (request instanceof GetEvolvedSchemaRequest) { handleGetEvolvedSchemaRequest( ((GetEvolvedSchemaRequest) request), responseFuture); } else if (request instanceof GetOriginalSchemaRequest) { handleGetOriginalSchemaRequest( (GetOriginalSchemaRequest) request, responseFuture); } else { throw new IllegalArgumentException( "Unrecognized CoordinationRequest type: " + request); } } catch (Throwable t) { context.failJob(t); throw t; } }, "handling coordination request %s", request); return responseFuture;
}
SchemaRegistry#handleEventFromOperator
方法用于处理DataSinkWriterOperator#handleFlushEvent
发送而来的FlushSuccessEvent
事件。还是使用handler执行具体逻辑:SchemaRegistryRequestHandler#flushSuccess
@Override
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) { runInEventLoop( () -> { try { if (event instanceof FlushSuccessEvent) { FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event; LOG.info( "Sink subtask {} succeed flushing for table {}.", flushSuccessEvent.getSubtask(), flushSuccessEvent.getTableId().toString()); requestHandler.flushSuccess( flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask(), currentParallelism); } else if (event instanceof SinkWriterRegisterEvent) { requestHandler.registerSinkWriter( ((SinkWriterRegisterEvent) event).getSubtask()); } else { throw new FlinkException("Unrecognized Operator Event: " + event); } } catch (Throwable t) { context.failJob(t); throw t; } }, "handling event %s from subTask %d", event, subtask);
}
SchemaRegistryRequestHandler
SchemaRegistryRequestHandler
是SchemaRegistry的执行器,类中schemaChangeStatus
是自己的状态记录状态的。
而pendingSubTaskIds
是记录待处理的任务id的,即数据流ID,是含有一个任务所有的并行度的子任务ID。
此处:
(1)pendingSubTaskIds空 -> 继续执行
(2)requestSubTaskId和发送过来的一样,则为移除头一个。
(3)其他pendingSubTaskIds不为空情形,则直接返回SchemaChangeResponse.busy()
,此处的busy就和SchemaOperator的response.isRegistryBusy()
对应上了。
继续执行:
calculateDerivedSchemaChangeEvents
方法是对事件作息写转换,根据的是flink的schema evolution
的策略进行转换,例如通过返回空集合的方式进行忽略 。
`schema.change.behavior` is of enum type, and could be set to `exception`, `evolve`, `try_evolve`, `lenient` or `ignore`.
而后此handler
的状态修改为WAITING_FOR_FLUSH
。
并返回ResponseCode.ACCEPTED
的状态,此时程序跳转回SchemaOperator#handleSchemaChangeEvent
方法。
SchemaRegistryRequestHandler#handleSchemaChangeRequest
方法:
/** * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing. * * @param request the received SchemaChangeRequest */public void handleSchemaChangeRequest( SchemaChangeRequest request, CompletableFuture<CoordinationResponse> response) { // We use requester subTask ID as the pending ticket, because there will be at most 1 schema // change requests simultaneously from each subTask int requestSubTaskId = request.getSubTaskId(); synchronized (schemaChangeRequestLock) { // Make sure we handle the first request in the pending list to avoid out-of-order // waiting and blocks checkpointing mechanism. if (schemaChangeStatus == RequestStatus.IDLE) { if (pendingSubTaskIds.isEmpty()) { LOG.info( "Received schema change event request {} from table {} from subTask {}. Pending list is empty, handling this.", request.getSchemaChangeEvent(), request.getTableId().toString(), requestSubTaskId); } else if (pendingSubTaskIds.get(0) == requestSubTaskId) { LOG.info( "Received schema change event request {} from table {} from subTask {}. It is on the first of the pending list, handling this.", request.getSchemaChangeEvent(), request.getTableId().toString(), requestSubTaskId); pendingSubTaskIds.remove(0); } else { LOG.info( "Received schema change event request {} from table {} from subTask {}. It is not the first of the pending list ({}).", request.getSchemaChangeEvent(), request.getTableId().toString(), requestSubTaskId, pendingSubTaskIds); if (!pendingSubTaskIds.contains(requestSubTaskId)) { pendingSubTaskIds.add(requestSubTaskId); } response.complete(wrap(SchemaChangeResponse.busy())); // (2) return; } SchemaChangeEvent event = request.getSchemaChangeEvent(); // If this schema change event has been requested by another subTask, ignore it. if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) { LOG.info("Event {} has been addressed before, ignoring it.", event); clearCurrentSchemaChangeRequest(); LOG.info( "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.", request); response.complete(wrap(SchemaChangeResponse.duplicate())); return; } schemaManager.applyOriginalSchemaChange(event); List<SchemaChangeEvent> derivedSchemaChangeEvents = calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); // (14)// If this schema change event is filtered out by LENIENT mode or merging table // route strategies, ignore it. if (derivedSchemaChangeEvents.isEmpty()) { LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event); clearCurrentSchemaChangeRequest(); LOG.info( "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.", request); response.complete(wrap(SchemaChangeResponse.ignored())); return; } LOG.info( "SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked."); // This request has been accepted. schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH; // (3)currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents); response.complete(wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents))); // (3) } else { LOG.info( "Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).", request, requestSubTaskId, pendingSubTaskIds); if (!pendingSubTaskIds.contains(requestSubTaskId)) { pendingSubTaskIds.add(requestSubTaskId); } response.complete(wrap(SchemaChangeResponse.busy())); // (2) } }
}
SchemaRegistryRequestHandler#getSchemaChangeResult
方法:
内容就是检查类成员变量SchemaRegistryRequestHandler#schemaChangeStatus
的状态:
- FINISHED -> 重置自身状态并返回FINISHED状态
- 非FINISHED -> 返回Processing状态,
SchemaOperator#requestSchemaChangeResult
接到SchemaChangeProcessingResponse
会在while一直循环等待阻塞。
public void getSchemaChangeResult(CompletableFuture<CoordinationResponse> response) { Preconditions.checkState( schemaChangeStatus != RequestStatus.IDLE, "Illegal schemaChangeStatus: should not be IDLE before getting schema change request results."); if (schemaChangeStatus == RequestStatus.FINISHED) { // (12)schemaChangeStatus = RequestStatus.IDLE; LOG.info( "SchemaChangeStatus switched from FINISHED to IDLE for request {}", currentDerivedSchemaChangeEvents); // This request has been finished, return it and prepare for the next request List<SchemaChangeEvent> finishedEvents = clearCurrentSchemaChangeRequest(); SchemaChangeResultResponse resultResponse = new SchemaChangeResultResponse(finishedEvents); response.complete(wrap(resultResponse)); } else { // Still working on schema change request, waiting it response.complete(wrap(new SchemaChangeProcessingResponse())); }
}
方法flushSuccess
用于处理DataSinkWriterOperator
返回的FlushSuccessEvent
事件。这里有点不好理解。
activeSinkWriters
是记录所有可用的writer的索引,也就是说writer的并行度可能大于1,activeSinkWriters记录的是writer的索引,接收的FlushSuccessEvent
只是其中一个writer发送的。需要等待所有writer都完成flush才能确定所有的schema修改前的数据都写入数据库了。
(1)if (activeSinkWriters.size() < parallelism)
内的就是上述过程。
(2)if (flushedSinkWriters.equals(activeSinkWriters))
代表所有writer都完成了flush。而后修改handler状态为RequestStatus.APPLYING
,即此handler正在apply schema change
。接下来执行applySchemaChange
方法 。
/** * Record flushed sink subtasks after receiving FlushSuccessEvent. * * @param tableId the subtask in SchemaOperator and table that the FlushEvent is about * @param sinkSubtask the sink subtask succeed flushing */public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) { flushedSinkWriters.add(sinkSubtask); if (activeSinkWriters.size() < parallelism) { LOG.info( "Not all active sink writers have been registered. Current {}, expected {}.", activeSinkWriters.size(), parallelism); return; } if (flushedSinkWriters.equals(activeSinkWriters)) { Preconditions.checkState( schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH, "Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not " + schemaChangeStatus); schemaChangeStatus = RequestStatus.APPLYING; // (9)LOG.info( "All sink subtask have flushed for table {}. Start to apply schema change.", tableId.toString()); schemaChangeThreadPool.submit( () -> applySchemaChange(tableId, currentDerivedSchemaChangeEvents)); }
}
SchemaRegistryRequestHandler#applySchemaChange
方法:
内部主要是schemaManager.applyEvolvedSchemaChange(changeEvent)
即执行表结构变更操作,其接口类org.apache.flink.cdc.common.sink.MetadataApplier
的doc内容:
- {@code MetadataApplier} is used to apply metadata changes to external systems.
可以看到schemaManager
至对外部数据执行的表结构变更,其实就是sink端的数据库,其内部一般是收到需要变更的内容,拼接SQL并发送到数据库执行。
最后,修改handler状态为RequestStatus.FINISHED
。
好像此FlushSuccessEvent
没有继续向SchemaOperator
继续传递,其实不然,SchemaOperator
是不断向SchemaRegistry
发送请求的:SchemaOperator#requestSchemaChangeResult
。
而SchemaRegistry
是根据handler状态判断返回值类型的
SchemaRegistryRequestHandler#getSchemaChangeResult
,此时handler状态已经是RequestStatus.FINISHED
,SchemaRegistry
就会给CompletableFuture
填充非SchemaChangeProcessingResponse
了,SchemaOperator类就中断阻塞,继续向下游发送数据了。
/** * Apply the schema change to the external system. * * @param tableId the table need to change schema * @param derivedSchemaChangeEvents list of the schema changes */private void applySchemaChange( TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) { for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) { if (changeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) { if (schemaChangeBehavior == SchemaChangeBehavior.IGNORE) { currentIgnoredSchemaChanges.add(changeEvent); continue; } } if (!metadataApplier.acceptsSchemaEvolutionType(changeEvent.getType())) { LOG.info("Ignored schema change {} to table {}.", changeEvent, tableId); currentIgnoredSchemaChanges.add(changeEvent); } else { try { metadataApplier.applySchemaChange(changeEvent); LOG.info("Applied schema change {} to table {}.", changeEvent, tableId); schemaManager.applyEvolvedSchemaChange(changeEvent); currentFinishedSchemaChanges.add(changeEvent); } catch (Throwable t) { LOG.error( "Failed to apply schema change {} to table {}. Caused by: {}", changeEvent, tableId, t); if (!shouldIgnoreException(t)) { currentChangeException = t; break; } else { LOG.warn( "Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}", changeEvent, t); } } } } Preconditions.checkState( schemaChangeStatus == RequestStatus.APPLYING, "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not " + schemaChangeStatus); schemaChangeStatus = RequestStatus.FINISHED; LOG.info( "SchemaChangeStatus switched from APPLYING to FINISHED for request {}.", currentDerivedSchemaChangeEvents);
}
SchemaRegistryRequestHandler.RequestStatus
类是就handler类状态的类型。具体状态流程可见文档。
// Schema change event state could transfer in the following way:
//
// -------- B --------
// | |
// v |
// -------- ---------------------
// | IDLE | --- A --> | WAITING_FOR_FLUSH |
// -------- ---------------------
// ^ |
// E C
// \ v
// ------------ ------------
// | FINISHED | <-- D -- | APPLYING |
// ------------ ------------
//
// A: When a request came to an idling request handler.
// B: When current request is duplicate or ignored by LENIENT / routed table merging
// strategies.
// C: When schema registry collected enough flush success events, and actually started to apply
// schema changes.
// D: When schema change application finishes (successfully or with exceptions)
// E: When current schema change request result has been retrieved by SchemaOperator, and ready
// for the next request.
private enum RequestStatus { IDLE, WAITING_FOR_FLUSH, APPLYING, FINISHED
}
接下来看下:Sink端的事件处理:
DataSinkWriterOperator
org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator
org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator#processElement
方法:
重点是对FlushEvent
的处理
@Override
public void processElement(StreamRecord<Event> element) throws Exception { Event event = element.getValue(); // FlushEvent triggers flush if (event instanceof FlushEvent) { handleFlushEvent(((FlushEvent) event)); return; } // CreateTableEvent marks the table as processed directly if (event instanceof CreateTableEvent) { processedTableIds.add(((CreateTableEvent) event).tableId()); this.<OneInputStreamOperator<Event, CommittableMessage<CommT>>>getFlinkWriterOperator() .processElement(element); return; } // Check if the table is processed before emitting all other events, because we have to make // sure that sink have a view of the full schema before processing any change events, // including schema changes. ChangeEvent changeEvent = (ChangeEvent) event; if (!processedTableIds.contains(changeEvent.tableId())) { emitLatestSchema(changeEvent.tableId()); processedTableIds.add(changeEvent.tableId()); } processedTableIds.add(changeEvent.tableId()); this.<OneInputStreamOperator<Event, CommittableMessage<CommT>>>getFlinkWriterOperator() .processElement(element);
}
handleFlushEvent
方法内只有两个操作:
- flush: 将目前已经接受到所有数据写入目标库(相当于jdbc的commit操作)。
- 发送事件:发送FlushSuccess。
notifyFlushSuccess
内容见类SchemaEvolutionClient
private void handleFlushEvent(FlushEvent event) throws Exception { copySinkWriter.flush(false); // (8) schemaEvolutionClient.notifyFlushSuccess( getRuntimeContext().getIndexOfThisSubtask(), event.getTableId()); // (9)
}
SchemaEvolutionClient
org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient
org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient#notifyFlushSuccess
方法:
发送了FlushSuccessEvent
事件至SchemaRegistry
类的handleEventFromOperator
方法。
public void notifyFlushSuccess(int subtask, TableId tableId) throws IOException { toCoordinator.sendOperatorEventToCoordinator( schemaOperatorID, new SerializedValue<>(new FlushSuccessEvent(subtask, tableId)));
}
TaskOperatorEventGateway
org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway
SchemaOperator
和DataSinkWriterOperator
中的toCoordinator
都是此类对象。
/*
Gateway to send an OperatorEvent or CoordinationRequest from a Task to the OperatorCoordinator JobManager side.
This is the first step in the chain of sending Operator Events and Requests from Operator to Coordinator. Each layer adds further context, so that the inner layers do not need to know about the complete context, which keeps dependencies small and makes testing easier.OperatorEventGateway takes the event, enriches the event with the OperatorID, and forwards it to:
TaskOperatorEventGateway enriches the event with the ExecutionAttemptID and forwards it to the:
JobMasterOperatorEventGateway which is RPC interface from the TaskManager to the JobManager.
*/
public interface TaskOperatorEventGateway { /** * Sends an event from the operator (identified by the given operator ID) to the operator * coordinator (identified by the same ID). */ void sendOperatorEventToCoordinator(OperatorID operator, SerializedValue<OperatorEvent> event); /** * Sends a request from current operator to a specified operator coordinator which is identified * by the given operator ID and return the response. */ CompletableFuture<CoordinationResponse> sendRequestToCoordinator( OperatorID operator, SerializedValue<CoordinationRequest> request);
}
MetadataApplier
org.apache.flink.cdc.common.sink.MetadataApplier
此类负责将表结构修改的事件,转化成为DDL,发送给目标sink端数据库执行。
/** {@code MetadataApplier} is used to apply metadata changes to external systems. */
@PublicEvolving
public interface MetadataApplier extends Serializable { /** Apply the given {@link SchemaChangeEvent} to external systems. */ void applySchemaChange(SchemaChangeEvent schemaChangeEvent) throws SchemaEvolveException; // (10) /** Sets enabled schema evolution event types of current metadata applier. */ default MetadataApplier setAcceptedSchemaEvolutionTypes( Set<SchemaChangeEventType> schemaEvolutionTypes) { return this; } /** Checks if this metadata applier should this event type. */ default boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) { return true; } /** Checks what kind of schema change events downstream can handle. */ default Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() { return Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()); }
}
DorisMetadataApplier
org.apache.flink.cdc.connectors.doris.sink.DorisMetadataApplier
实现 MetadataApplier
org.apache.flink.cdc.connectors.doris.sink.DorisMetadataApplier#applySchemaChange
:
以:
// (10)
@Override
public void applySchemaChange(SchemaChangeEvent event) throws SchemaEvolveException { try { // send schema change op to doris if (event instanceof CreateTableEvent) { applyCreateTableEvent((CreateTableEvent) event); } else if (event instanceof AddColumnEvent) { applyAddColumnEvent((AddColumnEvent) event); } else if (event instanceof DropColumnEvent) { applyDropColumnEvent((DropColumnEvent) event); } else if (event instanceof RenameColumnEvent) { applyRenameColumnEvent((RenameColumnEvent) event); } else if (event instanceof AlterColumnTypeEvent) { applyAlterColumnTypeEvent((AlterColumnTypeEvent) event); } else { throw new UnsupportedSchemaChangeEventException(event); } } catch (Exception ex) { throw new SchemaEvolveException(event, ex.getMessage(), null); }
}
applyAddColumnEvent举例说明:
这里仅做一些转换
private void applyAddColumnEvent(AddColumnEvent event) throws IOException, IllegalArgumentException { TableId tableId = event.tableId(); List<AddColumnEvent.ColumnWithPosition> addedColumns = event.getAddedColumns(); for (AddColumnEvent.ColumnWithPosition col : addedColumns) { Column column = col.getAddColumn(); FieldSchema addFieldSchema = new FieldSchema( column.getName(), buildTypeString(column.getType()), column.getDefaultValueExpression(), column.getComment()); schemaChangeManager.addColumn( tableId.getSchemaName(), tableId.getTableName(), addFieldSchema); }
}
SchemaChangeManager
org.apache.doris.flink.sink.schema.SchemaChangeManager
org.apache.doris.flink.sink.schema.SchemaChangeManager#addColumn
方法:
SchemaChangeHelper
是拼接SQL用的。schemaChange
方法向数据库发送需要执行的SQL。
public boolean addColumn(String database, String table, FieldSchema field) throws IOException, IllegalArgumentException { if (checkColumnExists(database, table, field.getName())) { LOG.warn( "The column {} already exists in table {}, no need to add it again", field.getName(), table); return true; } String tableIdentifier = getTableIdentifier(database, table); String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(tableIdentifier, field); return schemaChange( database, table, buildRequestParam(false, field.getName()), addColumnDDL);
}
SchemaChangeHelper
org.apache.doris.flink.sink.schema.SchemaChangeHelper
org.apache.doris.flink.sink.schema.SchemaChangeHelper#buildAddColumnDDL
用ADD_DDL
字符串模板拼接SQL:
// (11)
private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
public static String buildAddColumnDDL(String tableIdentifier, FieldSchema fieldSchema) { String name = fieldSchema.getName(); String type = fieldSchema.getTypeString(); String defaultValue = fieldSchema.getDefaultValue(); String comment = fieldSchema.getComment(); StringBuilder addDDL = new StringBuilder( String.format( ADD_DDL, DorisSchemaFactory.quoteTableIdentifier(tableIdentifier), DorisSchemaFactory.identifier(name), type)); if (defaultValue != null) { addDDL.append(" DEFAULT ").append(DorisSchemaFactory.quoteDefaultValue(defaultValue)); } commentColumn(addDDL, comment); return addDDL.toString();
}
流程总结:
- SchemaOperator接收到SchemaChangeEvent,发送
SchemaChangeRequest
至SchemaRegistry。 - SchemaRegistry内部执行器是SchemaRegistryRequestHandler,简称handler,handler内部持有有状态
schemaChangeStatus
其判断是否正在执行之前的Request,如果是则返回busy状态。如果不是则返回accept状态。其状态修改由RequestStatus.IDLE
为RequestStatus.WAITING_FOR_FLUSH
。 - SchemaOperator如果收到busy状态则sleep后再次发起请求,阻塞直到,收到accept状态,则发送一条
FlushEvent
至下游,之后发送SchemaChangeResultRequest
至SchemaRegistry,等待返回结果如果是SchemaChangeProcessingResponse
则认为SchemaChange还没有结束,sleep后再次发起请求,阻塞直至收到非SchemaChangeProcessingResponse
。此时阻塞,不再发送新的表结构的数据至下游。 - SchemaRegistry收到
SchemaChangeResultRequest
,handler会检查自身状态schemaChangeStatus
,如果不是RequestStatus.FINISHED
,则返回SchemaChangeProcessingResponse
。 - DataSinkWriterOperator收到
FlushEvent
,并执行flush操作,将所有已经收到的老表结构的数据写入数据库。并发送FlushSuccessEvent
给SchemaRegistry。 - SchemaRegistry的handler收集
FlushSuccessEvent
,当收到所有的subtask的FlushSuccessEvent
后,修改自身状态为RequestStatus.APPLYING
。后使用MetadataApplier执行sink端(外)数据库的表结构变更。执行后修改自身状态为RequestStatus.FINISHED
。 - 当SchemaOperator再次发送
SchemaChangeResultRequest
,且SchemaRegistry的handler的状态为RequestStatus.FINISHED
,SchemaRegistry返回给其结果为非SchemaChangeProcessingResponse
,SchemaOperator将不再阻塞,开始将新的表结构的数据继续发送至下游。
二、流程图
下图中的序号已经在源码中表示,可以在源码中搜索。
相关文章:
【Flink CDC】Flink CDC的Schema Evolution表结构演变的源码分析和流程图
Flink CDC版本:3.2.1 说明:本文从SchemaOperator接收到,表结构变更事件开始,表结构变更事件应由source端产生,本文不讨论。 可以先看流程图,研究源码。 参考文章: Flink cdc3.0动态变更表结构—…...
计算机网络 (27)IP多播
前言 IP多播(也称多址广播或组播)技术是一种允许一台或多台主机(多播源)发送单一数据包到多台主机(一次性的、同时的)的TCP/IP网络技术。 一、基本概念 定义:多播作为一点对多点的通信ÿ…...
当算法遇到线性代数(四):奇异值分解(SVD)
SVD分解的理论与应用 线性代数系列相关文章(置顶) 1.当算法遇到线性代数(一):二次型和矩阵正定的意义 2.当算法遇到线性代数(二):矩阵特征值的意义 3.当算法遇到线性代数࿰…...
鸿蒙操作系统(HarmonyOS)
鸿蒙操作系统(HarmonyOS)是华为公司推出的一款面向未来、面向全场景的分布式操作系统。它旨在为用户提供一个更加智能、便捷和安全的操作环境,支持多种终端设备之间的无缝协作。在鸿蒙应用开发中,ArkUI作为官方推荐的用户界面开发…...
通义灵码在跨领域应用拓展之物联网篇
目录 一.引言 二.通义灵码简介 三.通义灵码在物联网领域的设备端应用 1.传感器数据采集 (1).不同类型传感器的数据读取 (2).数据转换与预处理 2.设备控制指令接收和执行 (1).指令解析与处理 (2).设备动作执行 四.通义灵码在物联网领域的云端平台应用 1.数据存储和管…...
CSS语言的数据库交互
CSS语言的数据库交互:一种新潮流的探索 引言 在现代网页开发中,CSS(层叠样式表)无疑是构建优美和响应式网页的重要工具。然而,关于CSS和数据库之间的直接交互,尽管并不是一种常见的做法,却引发…...
[SMARTFORMS] 创建二维码
我们可以使用事务码SE73创建二维码 选择系统条形码,点击"更改"按钮 点击 创建选项 选择"新" 输入二维码名称和简短描述,点击"确认"按钮 选择"QR CODE 2005",点击"确认"按钮 选择"No…...
数据项目相关的AWS云计算架构设计
电商数据平台架构 高性能:使用Amazon EC2的计算优化实例处理业务逻辑和数据计算,搭配Amazon ElastiCache内存缓存,加速数据读取。应用负载均衡器(ALB)在EC2实例间分发流量,实现负载均衡。高可用性…...
智慧农业应用场景|珈和科技高标准农田信息化监管平台解决方案
近年来,珈和科技持续深耕农业领域,深耕农业时空大数据服务。 珈和利用遥感大数据、云计算、移动互联网、物联网、人工智能等先进技术,搭建“天空地一体化”监测体系,并创新建设了150的全球领先算法模型,可为100多种农作…...
C++ operator = 返回void 会发生什么?
1.operator 正常情况 #include <iostream> using namespace std;class Box { public:Box(double L) : length(L) {}Box(const Box& b){}Box& operator (const Box&){return *this;}public:double length; // 长度 };int main() {Box box1(1.0);Box box2(…...
Scala 模式匹配
Scala 模式匹配 引言 Scala 作为一种多范式编程语言,不仅支持面向对象编程,还融合了函数式编程的特性。在其丰富的特性集中,模式匹配(Pattern Matching)尤为引人注目,它是一种在许多编程语言中都有应用的编程范式,但在 Scala 中得到了特别强大的支持。模式匹配允许开发…...
微信小程序中 “页面” 和 “非页面” 的区别
微信小程序中 “页面” 和 “非页面” 的区别,并用表格进行对比。 核心概念: 页面 (Page): 页面是微信小程序中用户可以直接交互的视图层,也是小程序的基本组成部分。每个页面都有自己的 WXML 结构、WXSS 样式和 JavaScript 逻辑…...
解耦Java应用程序的方法和技巧
解耦 Java 应用程序是一项重要的设计原则是减少组件之间的依赖关系,使系统更加模块化、灵活和可维护。通过分离,您可以更轻松地更改、扩展或测试应用程序的各个部分,而不会影响其他部分。 分离 Java 应用程序需要应用减少组件之间直接依赖关系…...
Go语言的 的设计模式(Design Patterns)基础知识
Go语言的设计模式基础知识 引言 设计模式是一种在软件开发中经常使用的解决特定问题的通用方案。它们为开发者提供了一种有效的方式来组织代码、提高代码的可复用性、可维护性和灵活性。在众多编程语言中,Go语言因其独特的特性,如并发支持和简洁的语法…...
【Uniapp-Vue3】常用的表单组件button和input
表单组件中最常用的就是button组件和input组件 一、button组件 常用属性: <template><button>普通按钮</button><button size"mini">小按钮</button><button type"primary">蓝色按钮</button><…...
深入理解非对称加密:用Java实现RSA加解密
目录 一、非对称加密算法概述 二、Java 中实现非对称加密 非对称加密(Asymmetric Encryption)是一种加密方式,其中使用一对密钥:公钥(Public Key)和私钥(Private Key)。这种加密算…...
钓鱼邮件 wp
背景:Bob收到了一份钓鱼邮件,请找出木马的回连地址和端口。 1.打开文件 2.发现base64编码 3.base64解码 将base64编码全部存储在文件 base64.txt 中。 使用命令cat base6.txt | base64 -d | less解码并查看。 关键字: PK 知识点:…...
飞书企业消息实践
一、飞书自带的消息机器人限制 频控策略 - 服务端 API - 飞书开放平台 自定义机器人的频率控制和普通应用不同,为单租户单机器人 100 次/分钟,5 次/秒。建议发送消息尽量避开诸如 10:00、17:30 等整点及半点时间,否则可能出现因系统压力导致…...
逆向安卓抓包
打开Mumu网易,打开设置,打开其他,开启root权限 打开Mumu网易,找到apk安装藏航准备网.apk charles配置:proxy setting 端口9888 查看当地IP:help--->local IP address SSL Proxying Setting--->Add---->IP…...
简单的spring boot tomcat版本升级
简单的spring boot tomcat版本升级 1. 需求 我们使用的springboot版本为2.3.8.RELEASE,对应的tomcat版本为9.0.41,公司tomcat对应版本发现攻击者可发送不完整的POST请求触发错误响应,从而可能导致获取其他用户先前请求的数据,造…...
导出中心设计
业务背景 应用业务经常需要导出数据,但是并发的导出以及不合理的导出参数常常导致应用服务的内存溢出、其他依赖应用的崩溃、导出失败;因此才有导出中心的设计 设计思想 将导出应用所需的内存转移至导出中心,将导出的条数加以限制…...
zookeeper 数据类型
文章目录 引言I Znodezonde stat (状态信息)znode类型临时\永久序列化特性引言 在结构上与标准文件系统非常类似,拥有一个层次的命名空间,都是采用树形层次结构 Zookeeper树中的每个节点被称为:Znode,没有文件和目录之分。Znode兼具文件和目录两种特点Znode存储数据大小有…...
INT305 Machine Learning
W1 Introduction Nearest Neighbor Preliminaries and Nearest Neighbor Methods • Suppose we’re given a novel input vector 𝑥 we’d like to classify. • The idea: find the nearest input vector to 𝑥 in the training set and copy …...
OneFlow和PyTorch在性能上有哪些区别?
OneFlow 和 PyTorch 在性能上的区别主要体现在以下几个方面: 本篇文章的目录 分布式训练性能 硬件利用率和显存优化 模型训练速度 OneFlow:默认采用静态图模式,在模型训练前会对计算图进行编译优化,能够减少运行时的开销&…...
Java高频面试之SE-09
hello啊,各位观众姥爷们!!!本牛马baby今天又来了!哈哈哈哈哈嗝🐶 final关键字有什么作用? 在 Java 中,final 关键字有多个用途,它可以用于类、方法和变量。根据使用的上…...
面向对象(综合练习)
文字版格斗游戏 public class people {private String name;private int xuetiao;public people(String name){this.namename;this.xuetiao100;}public String getName(){return this.name;}public int getXuetiao(){return this.xuetiao;} } import java.util.Random; public…...
《机器学习》——随机森林
文章目录 什么是随机森林?随机森林的原理随机森林的优缺点优点缺点 随机森林模型API主要参数 实例实例步骤导入数据处理数据,切分数据构建模型训练模型测试数据并输出分类报告和混淆矩阵画出模型的前十重要性的特征 扩展 什么是随机森林? -随…...
Synthesia技术浅析(二):虚拟人物视频生成
Synthesia 的虚拟人物视频生成模块是其核心技术之一,能够将文本输入转换为带有同步语音和口型的虚拟人物视频。该模块如下所示: 1.文本输入处理 2.语音生成(TTS, Text-to-Speech) 3.口型同步(Lip Syncing࿰…...
kubelet状态错误报错
journalctl -xeu kubelet 执行后的日志如下: -- -- The process exit code is exited and its exit status is 1. Jan 02 14:20:06 iv-ydipyqxfr4wuxjsij0bd systemd[1]: kubelet.service: Failed with result exit-code. -- Subject: Unit failed -- Defined-By: system…...
全国青少年信息学奥林匹克竞赛(信奥赛)备考实战之循环结构(do-while循环语句)
在C编程中,循环结构是一种用于重复执行某段代码直到特定条件不再满足的控制流语句。除了常见的for循环和while循环之外,C还提供了一种特殊的循环结构——do-while循环。这种循环结构在某些场景下非常有用,特别是在需要至少执行一次循环体的情…...
专访“梦想”号总设计师:如何向11000米深海进发
在我国迈向海洋强国的征程中,“梦想” 号钻探船无疑是一颗闪耀的明星。作为我国自主设计建造的首艘大洋钻探船,它肩负着探索深海奥秘、挖掘地球深部资源的重任,承载着无数科研人员的梦想与期望。其总长 179.8 米、型宽 32.8 米,高…...
有限元分析学习——Anasys Workbanch第一阶段笔记(7)对称问题预备水杯案例分析
目录 1 序言 2 水杯案例 2.1 添加新材料 2.2 水压设置 2.3 约束边界条件设置及其结果 2.3.1 全约束固定(压缩桌面、Fixed support固定水杯底面) 2.3.2 单方面位移约束(压缩桌面、Displacement约束软弹簧) 2.3.3 接触约束(不压缩桌面、Fixed support 固定桌面、Frictional…...
【A-Lab】部署手册:开源AAA解决方案 —FreeRADIUS
由于篇幅限制,完整版请移步至部署手册:开源AAA解决方案 —FreeRADIUS - 星融元Asterfusion 1 软件介绍 2 基础环境 2.1 部署环境 2.2 操作系统基础设置 3 安装配置数据库 4 安装配置Web和PHP 5 安装配置FreeRADIUS 6 安装配置DaloRaDIUS 7 部署结…...
配置管理工具和k8s功能重叠部分的优势比较
通过自动化配置管理工具(如 Ansible、Puppet、Chef)和应用内管理机制,也可以实现自动部署、扩缩容、负载均衡和故障恢复等功能。Kubernetes(K8s)在这些方面具有哪些独特的优势呢,尤其是在云原生环境和大规模…...
五个不同类型的数据库安装
一、 官方首页下载 打开 MySQL 官方首页,链接为: MySQL 进去社区后选择合适的版本进行安装 安装细节 依图一路next 点击finish结束安装 二、 在线YUM仓库 将该安装包的下载链接在 Linux 操作系统中按照以下命令直接进行下载 三、 二进制本地 通过该链接…...
【万字详细教程】Linux to go——装在移动硬盘里的Linux系统(Ubuntu22.04)制作流程;一口气解决系统安装引导文件迁移显卡驱动安装等问题
Linux to go制作流程 0.写在前面 关于教程Why Linux to go?实际效果 1.准备工具2.制作步骤 下载系统镜像硬盘分区准备启动U盘安装系统重启完成驱动安装将系统启动引导程序迁移到移动硬盘上 3.可能出现的问题 3.1.U盘引导系统安装时出现崩溃3.2.不影响硬盘里本身已有…...
LeetCode 704 如何正确书写一个二分查找
题目链接 中文版:https://leetcode.cn/problems/binary-search/description/ 题目描述 给定一个 n 个元素有序的(升序)整型数组 nums 和一个目标值 target ,写一个函数搜索 nums 中的 target,如果目标值存在返回下标…...
LEED绿色建筑认证在2025年相关消息
关于LEED绿色建筑认证在2025年的相关消息,以下是一些关键信息: 一、认证体系的重要性与影响力 LEED(能源与环境设计先锋奖)评估体系是全球最具影响力的绿色建筑认证标准之一。中国已成为全球第二大LEED认证市场,显示…...
百度Android最新150道面试题及参考答案 (中)
Android 中一个 View 的显示渲染过程,自定义 View 的时候需要避免什么操作? 一、View 的显示渲染过程 测量(Measure)阶段 这个阶段是 View 渲染的第一步。父容器会调用子 View 的measure()方法来确定子 View 的大小。measure()方法会传入两个参数,即MeasureSpec(测量规格…...
详解Redis的Hash类型及相关命令
目录 HSET HGET HEXISTS HDEL HKEYS HVALS HGETALL HMGET HLEN HSETNX HINCRBY HINCRBYFLOAT 内部编码 应用场景 HSET 设置 hash 中指定的字段(field)的值(value)。 语法 HSET key field value [field value ...] 时…...
会议活动管理系统django
完整源码项目包获取→点击文章末尾名片!...
【数据结构与算法:七、查找】
第7章 查找 7.1 查找的基本概念 查找是通过某种算法,在一个给定的数据集合中找到目标元素或判断目标元素是否存在的操作。查找效率直接决定了程序性能,尤其在大数据处理场景下,合理选择查找算法至关重要。 查找的基本步骤 确定查找表结构…...
如何使用SparkSql
一、SparkSql的前世今生 Hive->Shark->Spark SQL 二、SparkSql依赖 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.2</version> </dependency> 三、…...
沃尔玛购物卡查询
在平常生活里,沃尔玛卡可不只是一张购物的凭据,它是连着心意的绳子,是让生活方便的小幸福。每次拿着沃尔玛卡走进熟悉的超市大门,心里就会有一点期待和满足。可随着时间过去,可能有些卡就静静躺在抽屉角落,…...
Markdown中流程图的用法
Markdown中流程图图的用法 前言语法详解基本流程图几何图形节点默认的节点分离节点的ID与内容节点形状圆角形节点的语法圆形节点的语法。非对称节点语法菱形节点的语法。六角形节点的语法。平行四边形节点的语法。梯形节点的语法。 连接线基本的连接线语法。无向线段连接线。点…...
Linux的proc目录与什么有关?【以及它里面的文件各自记录着什么信息】
在 Linux 系统中,/proc 目录是一个虚拟文件系统,提供了关于内核、进程和系统状态的实时信息。它与系统的 内核 和 进程 运行状态紧密相关,是系统管理员、开发人员和用户了解系统运行状况的重要途径。 /proc 目录的名称来源于 “process”&am…...
无监督学习入门指南:从原理到实践
目录 1 无监督学习基础 1.1 核心目标 1.2 理论基础 1.3 应用层面 2 聚类分析 2.1 相似性度量 2.2 聚类算法 2.2.1 K-均值聚类 2.2.2 密度聚类:DBSCAN 2.2.3 谱聚类 3 降维技术 3.1 线性 3.2 非线性 3.3 降维技术面临关键问题 4 概率密度估计 4.1 参…...
(MTK平台mt8168)通过i2c调试外接MCU管理外接电源项目
这个项目是我几年前在mtk方案公司调试的一个比较具有综合性的项目,涉及到知识点有很多,我个人认为算是一个很经典的一个项目,当然这个是对技术人员而讲。我大概总结一下,涉及到i2c,kernel中的timer_list,示波器和逻辑分析仪的使用,还有i2c硬件上的原理,如果host断采用3…...
计算机网络——网络层—路由算法和路由协议
一、因特网的路由选择协议 • 不存在一种绝对的最佳路由算法。 • 所谓“最佳”只能是相对于某一种特定要求下得出的较为合理的选择而已。 • 实际的路由选择算法,应尽可能接近于理想的算法。 • 路由选择是个非常复杂的问题 • 它是网络中的所有结点共同协调工…...
WPS计算机二级•数据查找分析
听说这里是目录哦 通配符🌌问号(?)星号(*)波形符(~) 排序🌠数字按大小排序以当前选定区域排序以扩展选定区域排序 文字按首字母排序 快速筛选分类数据☄️文字筛选数字筛选颜色筛选…...