博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink - state管理
阅读量:4599 次
发布时间:2019-06-09

本文共 13719 字,大约阅读时间需要 45 分钟。

没有描述了整个checkpoint的流程,但是对于如何生成snapshot和恢复snapshot的过程,并没有详细描述,这里补充

 

StreamOperator

/** * Basic interface for stream operators. Implementers would implement one of * {
@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or * {
@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators * that process elements. * *

The class {

@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} * offers default implementation for the lifecycle and properties methods. * *

Methods of {

@code StreamOperator} are guaranteed not to be called concurrently. Also, if using * the timer service, timer callbacks are also guaranteed not to be called concurrently with * methods on {
@code StreamOperator}. * * @param
The output type of the operator */public interface StreamOperator
extends Serializable { // ------------------------------------------------------------------------ // life cycle // ------------------------------------------------------------------------ /** * Initializes the operator. Sets access to the context and the output. */ void setup(StreamTask
containingTask, StreamConfig config, Output
> output); /** * This method is called immediately before any elements are processed, it should contain the * operator's initialization logic. * * @throws java.lang.Exception An exception in this method causes the operator to fail. */ void open() throws Exception; /** * This method is called after all records have been added to the operators via the methods * {
@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or * {
@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and * {
@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}. *

* The method is expected to flush all remaining buffered data. Exceptions during this flushing * of buffered should be propagated, in order to cause the operation to be recognized asa failed, * because the last data items are not processed properly. * * @throws java.lang.Exception An exception in this method causes the operator to fail. */ void close() throws Exception; /** * This method is called at the very end of the operator's life, both in the case of a successful * completion of the operation, and in the case of a failure and canceling. * * This method is expected to make a thorough effort to release all resources * that the operator has acquired. */ void dispose(); // ------------------------------------------------------------------------ // state snapshots // ------------------------------------------------------------------------ /** * Called to draw a state snapshot from the operator. This method snapshots the operator state * (if the operator is stateful) and the key/value state (if it is being used and has been * initialized). * * @param checkpointId The ID of the checkpoint. * @param timestamp The timestamp of the checkpoint. * * @return The StreamTaskState object, possibly containing the snapshots for the * operator and key/value state. * * @throws Exception Forwards exceptions that occur while drawing snapshots from the operator * and the key/value state. */ StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception; /** * Restores the operator state, if this operator's execution is recovering from a checkpoint. * This method restores the operator state (if the operator is stateful) and the key/value state * (if it had been used and was initialized when the snapshot ocurred). * *

This method is called after {

@link #setup(StreamTask, StreamConfig, Output)} * and before {
@link #open()}. * * @param state The state of operator that was snapshotted as part of checkpoint * from which the execution is restored. * * @param recoveryTimestamp Global recovery timestamp * * @throws Exception Exceptions during state restore should be forwarded, so that the system can * properly react to failed state restore and fail the execution attempt. */ void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception; /** * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager. * * @param checkpointId The ID of the checkpoint that has been completed. * * @throws Exception Exceptions during checkpoint acknowledgement may be forwarded and will cause * the program to fail and enter recovery. */ void notifyOfCompletedCheckpoint(long checkpointId) throws Exception; // ------------------------------------------------------------------------ // miscellaneous // ------------------------------------------------------------------------ void setKeyContextElement(StreamRecord
record) throws Exception; /** * An operator can return true here to disable copying of its input elements. This overrides * the object-reuse setting on the {
@link org.apache.flink.api.common.ExecutionConfig} */ boolean isInputCopyingDisabled(); ChainingStrategy getChainingStrategy(); void setChainingStrategy(ChainingStrategy strategy);}

这对接口会负责,将operator的state做snapshot和restore相应的state

StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception;

void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception;

 

首先看到,生成和恢复的时候,都是以StreamTaskState为接口

public class StreamTaskState implements Serializable, Closeable {    private static final long serialVersionUID = 1L;        private StateHandle
operatorState; private StateHandle
functionState; private HashMap
> kvStates;

可以看到,StreamTaskState是对三种state的封装

AbstractStreamOperator,先只考虑kvstate的情况,其他的更简单

@Overridepublic StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {    // here, we deal with key/value state snapshots        StreamTaskState state = new StreamTaskState();    if (stateBackend != null) {        HashMap
> partitionedSnapshots = stateBackend.snapshotPartitionedState(checkpointId, timestamp); if (partitionedSnapshots != null) { state.setKvStates(partitionedSnapshots); } } return state;}@Override@SuppressWarnings("rawtypes,unchecked")public void restoreState(StreamTaskState state) throws Exception { // restore the key/value state. the actual restore happens lazily, when the function requests // the state again, because the restore method needs information provided by the user function if (stateBackend != null) { stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates()); }}

可以看到flink1.1.0和之前比逻辑简化了,把逻辑都抽象到stateBackend里面去

 

AbstractStateBackend
/** * A state backend defines how state is stored and snapshotted during checkpoints. */public abstract class AbstractStateBackend implements java.io.Serializable {    protected transient TypeSerializer
keySerializer; protected transient ClassLoader userCodeClassLoader; protected transient Object currentKey; /** For efficient access in setCurrentKey() */ private transient KvState
[] keyValueStates; //便于快速遍历的结构 /** So that we can give out state when the user uses the same key. */ protected transient HashMap
> keyValueStatesByName; //记录key的kvState /** For caching the last accessed partitioned state */ private transient String lastName; @SuppressWarnings("rawtypes") private transient KvState lastState;

 

stateBackend.snapshotPartitionedState

public HashMap
> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception { if (keyValueStates != null) { HashMap
> snapshots = new HashMap<>(keyValueStatesByName.size()); for (Map.Entry
> entry : keyValueStatesByName.entrySet()) { KvStateSnapshot
snapshot = entry.getValue().snapshot(checkpointId, timestamp); snapshots.put(entry.getKey(), snapshot); } return snapshots; } return null;}

逻辑很简单,只是把cache的所有kvstate,创建一下snapshot,再push到HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshots

 

stateBackend.injectKeyValueStateSnapshots,只是上面的逆过程

/** * Injects K/V state snapshots for lazy restore. * @param keyValueStateSnapshots The Map of snapshots */@SuppressWarnings("unchecked,rawtypes")public void injectKeyValueStateSnapshots(HashMap
keyValueStateSnapshots) throws Exception { if (keyValueStateSnapshots != null) { if (keyValueStatesByName == null) { keyValueStatesByName = new HashMap<>(); } for (Map.Entry
state : keyValueStateSnapshots.entrySet()) { KvState kvState = state.getValue().restoreState(this, keySerializer, userCodeClassLoader); keyValueStatesByName.put(state.getKey(), kvState); } keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]); }}

 

具体看看FsState的snapshot和restore逻辑,

AbstractFsState.snapshot

@Overridepublic KvStateSnapshot
snapshot(long checkpointId, long timestamp) throws Exception { try (FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(checkpointId, timestamp)) { // // serialize the state to the output stream DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(new DataOutputStream(out)); outView.writeInt(state.size()); for (Map.Entry
> namespaceState: state.entrySet()) { N namespace = namespaceState.getKey(); namespaceSerializer.serialize(namespace, outView); outView.writeInt(namespaceState.getValue().size()); for (Map.Entry
entry: namespaceState.getValue().entrySet()) { keySerializer.serialize(entry.getKey(), outView); stateSerializer.serialize(entry.getValue(), outView); } } outView.flush(); //真实的内容是刷到文件的 // create a handle to the state return createHeapSnapshot(out.closeAndGetPath()); //snapshot里面需要的只是path }}

 

createCheckpointStateOutputStream

@Overridepublic FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {    checkFileSystemInitialized();    Path checkpointDir = createCheckpointDirPath(checkpointID); //根据checkpointId,生成文件path    int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);    return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);}

 

FsCheckpointStateOutputStream

封装了write,flush, closeAndGetPath接口,

public void flush() throws IOException {    if (!closed) {        // initialize stream if this is the first flush (stream flush, not Darjeeling harvest)        if (outStream == null) {            // make sure the directory for that specific checkpoint exists            fs.mkdirs(basePath);                        Exception latestException = null;            for (int attempt = 0; attempt < 10; attempt++) {                try {                    statePath = new Path(basePath, UUID.randomUUID().toString());                    outStream = fs.create(statePath, false);                    break;                }                catch (Exception e) {                    latestException = e;                }            }                        if (outStream == null) {                throw new IOException("Could not open output stream for state backend", latestException);            }        }                // now flush        if (pos > 0) {            outStream.write(writeBuffer, 0, pos);            pos = 0;        }    }}

 

AbstractFsStateSnapshot.restoreState

@Overridepublic KvState
restoreState( FsStateBackend stateBackend, final TypeSerializer
keySerializer, ClassLoader classLoader) throws Exception { // state restore ensureNotClosed(); try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) { // make sure the in-progress restore from the handle can be closed registerCloseable(inStream); DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream); final int numKeys = inView.readInt(); HashMap
> stateMap = new HashMap<>(numKeys); for (int i = 0; i < numKeys; i++) { N namespace = namespaceSerializer.deserialize(inView); final int numValues = inView.readInt(); Map
namespaceMap = new HashMap<>(numValues); stateMap.put(namespace, namespaceMap); for (int j = 0; j < numValues; j++) { K key = keySerializer.deserialize(inView); SV value = stateSerializer.deserialize(inView); namespaceMap.put(key, value); } } return createFsState(stateBackend, stateMap); // } catch (Exception e) { throw new Exception("Failed to restore state from file system", e); }}

转载于:https://www.cnblogs.com/fxjwind/p/6103314.html

你可能感兴趣的文章
批处理文件脚本总结
查看>>
快速排序C++代码
查看>>
mui搜索框 搜索点击事件
查看>>
bzoj 5289: [Hnoi2018]排列
查看>>
IE10 招贤纳意问题整理文章-安装卸载、功能设置篇
查看>>
joomla处境堪忧
查看>>
Jquery-AJAX
查看>>
python 在windows环境下 压缩文件
查看>>
CSS 动画总结
查看>>
mysql命令gruop by报错this is incompatible with sql_mode=only_full_group_by
查看>>
LeetCode55 Jump Game
查看>>
poj 3764 The xor-longest Path (01 Trie)
查看>>
预备作业01
查看>>
【Spark】Spark-Redis连接池
查看>>
【云计算】使用supervisor管理Docker多进程-ntpd+uwsgi+nginx示例最佳实践
查看>>
Ubuntu16.04下配置ssh免密登录
查看>>
实验二 2
查看>>
will-change属性
查看>>
android学习笔记54——ContentProvider
查看>>
Unity3d android开发之触摸操作识别-双击,滑动去噪处理
查看>>