从曲线碎片中收集拼图

Collect puzzle from curve fragments

我有一个数据,它由许多块组成。我现在知道它们来自一些连续的曲线,但后来在 y 方向上移动了。现在我想将它们移回以估计原始曲线。有些部分没有移动,只是不存在。为了澄清情况,生成类似内容的虚拟代码如下(Matlab):

%% generate some dummy data
knots = rand(10,2);
% fix starting and stop points
knots = [[0,rand()];knots;[1,rand()]];
% sort knots
knots=unique(knots,'rows');
% generate dummy curve
dummyX = linspace(0,1,10^4);
dummyY = interp1(knots(:,1),knots(:,2),dummyX,'spline');
figure()
subplot(2,1,1)
plot(dummyX,dummyY)
%% Add offset and wipe some parts
% get borders of chunks
borders = [1;randi([1,numel(dummyX)],20,1);numel(dummyX)];
borders = unique(borders);
borders = [borders(1:end-1)+1,borders(2:end)];
borders(1) = 1;
% add ofsets or nans
offset = (rand(size(borders,1),1)-0.5)*5;
offset(randperm(numel(offset),floor(size(borders,1)/3)))=nan;
for iBorder = 1:size(borders,1)
   idx = borders(iBorder,1): borders(iBorder,2);
   dummyY(idx)=dummyY(idx)+offset(iBorder);
   dummyY(idx([1,end]))=nan;
end
subplot(2,1,2)
plot(dummyX,dummyY)

原来的曲线在上面,在下面移动了。我尝试成对移动块,最小化三次样条的长度,但它对我不起作用。我明白,不可能获得完全相同的曲线(我可能会丢失一些峰)。

你能帮我找到最好的班次吗?

我对此有几个想法,并尝试了整体曲率、弧长等以及混合组合。事实证明,一个简单的 chi**2 效果最好。所以它就这么简单:

  1. 通过样条
  2. 获得一些节点以适合具有给定精度的每个块
  3. 加入一切
  4. 减少节点以避免接触组中的节点非常接近,这些节点会导致大曲率。
  5. 在整个集合上使用 leastsq 拟合,在连接的和减少的节点集合上使用样条曲线来查找偏移。

理论上可以玩/修改:

  • 样条顺序
  • 最小结密度
  • 最大结密度
  • 如何处理相邻集合
  • 在较大的间隙上打结
  • 等等

注意:在一些随机数据中,splrev 产生了错误消息。由于这些大多不是很有用,我只能说这段代码不是 100% 可靠的。)

代码如下

import matplotlib.pyplot as plt
import numpy as np
from scipy.interpolate import interp1d, splrep, splev
from scipy.optimize import fmin, leastsq

def reduce_knots( inList, dist ):
    outList=[]
    addList=[]
    for i in inList:
        try:
            if abs( i - addList[ -1 ] ) < dist:
                addList += [ i ]
            else:
                outList += [ addList ]
                addList = [ i ]
        except IndexError:### basically the first
            addList = [ i]
    outList += [ addList ]
    return [ sum( x ) / len( x ) for x in outList ]

def adaptive_knots( inX, inY, thresh=.005 ):
    ll = len( inX )
    sup = ll - 4
    assert sup > 3
    nN = 3
    test = True
    while test:
        testknots = np.linspace( 1, len( inX ) - 2, nN, dtype=np.int )
        testknots = [ inX[ x ] for x in testknots ]
        myTCK= splrep( inX , inY, t=testknots )
        newY = splev( inX , myTCK )
        chi2 = np.sum( ( newY - inY )**2 ) / ll
        if chi2 > thresh:
            nN += 1
            if nN > sup:
                test = False
        else:
            test = False
    return testknots

def global_residuals( shiftList, xBlocks, yBlocks, allTheKnots ):# everything shifted (1 is redundant by global offset) Blocks must be ordered an np.arrays
    localYBlocks = [ s + yList for s, yList in zip( shiftList, yBlocks ) ]
    allTheX = np.concatenate( xBlocks )
    allTheY = np.concatenate( localYBlocks )
    tck = splrep( allTheX, allTheY, t=allTheKnots )
    yList  = splev( allTheX, tck )
    diff = yList - allTheY
    return diff

#~ np.random.seed( 28561 )
np.random.seed( 5561 )
#~ np.random.seed( 733437 )

### python way for test data
knots = np.random.rand( 8, 2 )
knots = np.array( sorted( [ [ 0, np.random.rand() ] ] + list( knots ) + [ [ 1, np.random.rand() ] ], key=lambda x: x[ 0 ] ) )
dummyX = np.linspace( 0, 1, 3e4 )
f = interp1d( knots[ :, 0 ], knots[ :, 1 ], 'cubic' )
dummyY = np.fromiter( ( f( x ) for x in dummyX ), np.float )
chunk = np.append( [ 0 ], np.append( np.sort( np.random.randint( 7, high=len( dummyX ) - 10 , size= 10, dtype=np.int ) ), len( dummyX ) ) )

xDataDict = dict()
yDataDict = dict()
allX = np.array( [] )
allY = np.array( [] )
allK = np.array( [] )
allS = []

for i, val in enumerate(chunk[ : -1 ] ):
    if np.random.rand() < .75: ## 25% of not appearing
        xDataDict[ i ] = dummyX[ val:chunk[ i + 1 ] ]
        realShift = 1.5 * ( 1 - 2 * np.random.rand() )
        allS += [ realShift ]
        yDataDict[ i ] = dummyY[ val:chunk[ i + 1 ] ] + realShift
        yDataDict[ i ] = np.fromiter( ( np.random.normal( scale=.05, loc=y ) for y in yDataDict[ i ] ), np.float )
        allX = np.append( allX, xDataDict[ i ] )
        allY = np.append( allY, yDataDict[ i ] )

### Plotting
fig = plt.figure()
ax = fig.add_subplot( 3, 1, 1 )
ax.plot( knots[ :, 0 ],knots[ :, 1 ], ls='', c='r', marker='o')
ax.plot( dummyX , dummyY, '--' )
for key in xDataDict.keys():
    ax.plot(xDataDict[ key ], yDataDict[ key ] )
    myKnots = adaptive_knots( xDataDict[ key ], yDataDict[ key ] )
    allK = np.append( allK, myKnots )
    myTCK = splrep( xDataDict[ key ], yDataDict[ key ], t=myKnots )
    ax.plot( xDataDict[ key ], splev( xDataDict[ key ] , myTCK ) )
myTCK = splrep( allX, allY, t=allK )
ax.plot( allX, splev( allX, myTCK ) )
for x in allK:
    ax.axvline( x=x, linestyle=':', color='#AAAAAA', linewidth=1 )

### now fitting
myXBlockList = []
myYBlockList = []
for key in sorted( xDataDict.keys() ):
     myXBlockList += [ xDataDict[ key ] ]
     myYBlockList += [ yDataDict[ key ] ]

#start values
s = [ 0 ]
for i,y in enumerate( myYBlockList[ :-1 ] ):
    ds = myYBlockList[ i + 1 ][ 0 ] - y[ -1 ]
    s += [ -ds ]
startShift = np.cumsum( s )

allK = reduce_knots( allK, .01 )

sol, ierr = leastsq( global_residuals, x0=startShift, args=( myXBlockList, myYBlockList, allK ), maxfev=10000 )
sol = np.array(sol) - sol[ 0 ]
print "solution: ", -sol
print "real: ", np.array( allS ) - allS[ 0 ]

### Plotting solutions
bx = fig.add_subplot( 3, 1, 3, sharex=ax )
for x, y, s in zip( myXBlockList, myYBlockList, sol ):
    bx.plot( x, y + s )

localYBlocks = [ s + yList for s,yList in zip( sol, myYBlockList ) ]
allTheX = np.concatenate( myXBlockList )
allTheY = np.concatenate( localYBlocks )
tck = splrep( allTheX, allTheY, t=allK )
dx = allTheX[ 1 ] - allTheX[ 0 ]
testX = np.arange( allTheX[ 0 ], allTheX[ -1 ], dx )
finalyList  = splev( testX, tck)
bx.plot( testX, finalyList , 'k--' )

mean = sum( dummyY ) / len( dummyY ) - sum( finalyList ) / len( finalyList )

bx.plot( dummyX, dummyY - mean, '--' )
for x in allK:
    bx.axvline( x=x, linestyle=':', color='#AAAAAA', linewidth=1 )

cx = fig.add_subplot( 3, 1, 2, sharex=ax )
for x, y, s in zip( myXBlockList, myYBlockList, startShift ):
    cx.plot( x, y + s )

plt.show()

对于小间隙,这在测试数据上效果很好

上图将原始样条及其节点显示为红点。这生成了数据。此外,它显示了嘈杂的移位块,初始拟合结为垂直线和相应的样条拟合。 中图显示按 pre-calculated 起始值移动的块 - 对齐的末端。 下图显示原始样条、拟合样条、减少的结位置和根据拟合解决方案移动的块。

当然,差距越大,解越偏离原来的

...不过还是很不错的。